1use super::{
2 peer_addr::PeerAddr,
3 peer_info::PeerInfo,
4 peer_source::{ConnectionDirection, PeerSource},
5 peer_state::PeerState,
6 runtime_id::PublicRuntimeId,
7 stats::{ByteCounters, StatsTracker},
8};
9use crate::sync::{AwaitDrop, DropAwaitable, WatchSenderExt};
10use std::{
11 collections::{HashMap, hash_map::Entry},
12 fmt,
13 sync::{
14 Arc,
15 atomic::{AtomicU64, Ordering},
16 },
17 time::SystemTime,
18};
19use tokio::sync::watch;
20
21pub(super) struct ConnectionSet {
23 connections: watch::Sender<HashMap<ConnectionKey, ConnectionData>>,
24}
25
26impl ConnectionSet {
27 pub fn new() -> Self {
28 Self {
29 connections: watch::Sender::new(HashMap::default()),
30 }
31 }
32
33 pub fn reserve(&self, addr: PeerAddr, source: PeerSource) -> ReserveResult {
38 let key = ConnectionKey {
39 addr,
40 dir: source.direction(),
41 };
42
43 self.connections
44 .send_if_modified_return(|connections| match connections.entry(key) {
45 Entry::Vacant(entry) => {
46 let id = ConnectionId::next();
47
48 entry.insert(ConnectionData {
49 id,
50 state: InnerPeerState::Known,
51 source,
52 stats_tracker: StatsTracker::default(),
53 on_release: DropAwaitable::new(),
54 });
55
56 (
57 true,
58 ReserveResult::Permit(ConnectionPermit {
59 connections: self.connections.clone(),
60 key,
61 id,
62 }),
63 )
64 }
65 Entry::Occupied(entry) => {
66 let data = entry.get();
67
68 (
69 false,
70 ReserveResult::Occupied(data.on_release.subscribe(), data.source, data.id),
71 )
72 }
73 })
74 }
75
76 pub fn peer_info_collector(&self) -> PeerInfoCollector {
77 PeerInfoCollector(self.connections.clone())
78 }
79
80 pub fn get_peer_info(&self, addr: PeerAddr) -> Option<PeerInfo> {
81 self.with(addr, |data| data.peer_info(addr))
82 }
83
84 pub fn get_peer_key(&self, addr: PeerAddr) -> Option<usize> {
85 self.with(addr, |data| match data.state {
86 InnerPeerState::Active { peer_key, .. } => Some(peer_key),
87 InnerPeerState::Known | InnerPeerState::Connecting | InnerPeerState::Handshaking => {
88 None
89 }
90 })
91 .flatten()
92 }
93
94 fn with<F, R>(&self, addr: PeerAddr, f: F) -> Option<R>
95 where
96 F: FnOnce(&ConnectionData) -> R,
97 {
98 let connections = self.connections.borrow();
99
100 connections
101 .get(&ConnectionKey {
102 addr,
103 dir: ConnectionDirection::Incoming,
104 })
105 .or_else(|| {
106 connections.get(&ConnectionKey {
107 addr,
108 dir: ConnectionDirection::Outgoing,
109 })
110 })
111 .map(f)
112 }
113
114 pub fn subscribe(&self) -> watch::Receiver<HashMap<ConnectionKey, ConnectionData>> {
115 self.connections.subscribe()
116 }
117}
118
119#[derive(Clone, Copy, Eq, PartialEq, Debug)]
123#[repr(transparent)]
124pub(super) struct ConnectionId(u64);
125
126impl ConnectionId {
127 pub fn next() -> Self {
128 static NEXT: AtomicU64 = AtomicU64::new(0);
129 Self(NEXT.fetch_add(1, Ordering::Relaxed))
130 }
131}
132
133impl fmt::Display for ConnectionId {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 write!(f, "{}", self.0)
136 }
137}
138
139pub(super) enum ReserveResult {
140 Permit(ConnectionPermit),
141 Occupied(AwaitDrop, PeerSource, ConnectionId),
143}
144
145#[derive(Clone)]
146pub struct PeerInfoCollector(watch::Sender<HashMap<ConnectionKey, ConnectionData>>);
147
148impl PeerInfoCollector {
149 pub fn collect(&self) -> Vec<PeerInfo> {
150 self.0
151 .borrow()
152 .iter()
153 .map(|(key, data)| data.peer_info(key.addr))
154 .collect()
155 }
156}
157
158pub(super) struct ConnectionPermit {
161 connections: watch::Sender<HashMap<ConnectionKey, ConnectionData>>,
162 key: ConnectionKey,
163 id: ConnectionId,
164}
165
166impl ConnectionPermit {
167 pub fn mark_as_connecting(&self) {
168 self.set_state(InnerPeerState::Connecting);
169 }
170
171 pub fn mark_as_handshaking(&self) {
172 self.set_state(InnerPeerState::Handshaking);
173 }
174
175 pub fn mark_as_active(&self, runtime_id: PublicRuntimeId, peer_key: usize) {
176 self.set_state(InnerPeerState::Active {
177 id: runtime_id,
178 since: SystemTime::now(),
179 peer_key,
180 });
181 }
182
183 fn set_state(&self, new_state: InnerPeerState) {
184 self.connections.send_if_modified(|connections| {
185 let peer = connections.get_mut(&self.key).unwrap();
187
188 if peer.state != new_state {
189 peer.state = new_state;
190 true
191 } else {
192 false
193 }
194 });
195 }
196
197 pub fn released(&self) -> AwaitDrop {
199 self.with(|data| data.on_release.subscribe())
200 }
201
202 pub fn addr(&self) -> PeerAddr {
203 self.key.addr
204 }
205
206 pub fn id(&self) -> ConnectionId {
207 self.id
208 }
209
210 pub fn source(&self) -> PeerSource {
211 self.with(|data| data.source)
212 }
213
214 pub fn byte_counters(&self) -> Arc<ByteCounters> {
215 self.with(|data| data.stats_tracker.bytes.clone())
216 }
217
218 fn with<F, R>(&self, f: F) -> R
219 where
220 F: FnOnce(&ConnectionData) -> R,
221 {
222 f(self.connections.borrow().get(&self.key).unwrap())
224 }
225}
226
227impl fmt::Debug for ConnectionPermit {
228 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
229 f.debug_struct("ConnectionPermit")
230 .field("key", &self.key)
231 .field("id", &self.id)
232 .finish_non_exhaustive()
233 }
234}
235
236impl Drop for ConnectionPermit {
237 fn drop(&mut self) {
238 self.connections.send_if_modified(|connections| {
239 let Entry::Occupied(entry) = connections.entry(self.key) else {
240 return false;
241 };
242
243 if entry.get().id != self.id {
244 return false;
245 }
246
247 entry.remove();
248 true
249 });
250 }
251}
252
253#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
254pub(super) struct ConnectionKey {
255 addr: PeerAddr,
256 dir: ConnectionDirection,
257}
258
259pub(super) struct ConnectionData {
260 id: ConnectionId,
261 state: InnerPeerState,
262 source: PeerSource,
263 stats_tracker: StatsTracker,
264 on_release: DropAwaitable,
265}
266
267impl ConnectionData {
268 fn peer_info(&self, addr: PeerAddr) -> PeerInfo {
269 let stats = self.stats_tracker.read();
270
271 PeerInfo {
272 addr,
273 source: self.source,
274 state: self.state.to_peer_state(),
275 stats,
276 }
277 }
278}
279
280#[derive(Eq, PartialEq)]
281enum InnerPeerState {
282 Known,
283 Connecting,
284 Handshaking,
285 Active {
286 id: PublicRuntimeId,
287 since: SystemTime,
288 peer_key: usize,
289 },
290}
291
292impl InnerPeerState {
293 fn to_peer_state(&self) -> PeerState {
294 match self {
295 Self::Known => PeerState::Known,
296 Self::Connecting => PeerState::Connecting,
297 Self::Handshaking => PeerState::Handshaking,
298 Self::Active { id, since, .. } => PeerState::Active {
299 id: *id,
300 since: *since,
301 },
302 }
303 }
304}