ouisync/network/dht/
lookup_stream.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll, ready},
4};
5
6use btdht::InfoHash;
7use futures_util::{Stream, StreamExt};
8use tokio::sync::mpsc;
9use tokio_stream::wrappers::UnboundedReceiverStream;
10
11use super::{super::PeerAddr, DhtDiscovery, DhtEvent, LookupRequest};
12
13/// Stream returned from [`Network::dht_lookup`].
14pub struct DhtLookupStream {
15    request: Option<LookupRequest>,
16    event_rx: UnboundedReceiverStream<DhtEvent>,
17    allow_local: bool,
18}
19
20impl DhtLookupStream {
21    pub(in super::super) fn start(
22        dht: &DhtDiscovery,
23        info_hash: InfoHash,
24        announce: bool,
25        allow_local: bool,
26    ) -> Self {
27        let (peer_tx, peer_rx) = mpsc::unbounded_channel();
28        let request = dht.start_lookup(info_hash, announce, peer_tx);
29
30        Self {
31            request: Some(request),
32            event_rx: UnboundedReceiverStream::new(peer_rx),
33            allow_local,
34        }
35    }
36
37    /// Create DHT lookup that yields no peer.
38    pub fn empty() -> Self {
39        Self {
40            request: None,
41            event_rx: UnboundedReceiverStream::new(mpsc::unbounded_channel().1),
42            allow_local: false,
43        }
44    }
45}
46
47impl Stream for DhtLookupStream {
48    type Item = PeerAddr;
49
50    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51        while let Some(event) = ready!(self.event_rx.poll_next_unpin(cx)) {
52            match event {
53                DhtEvent::PeerFound(peer) => {
54                    if !self.allow_local && peer.initial_addr().is_local() {
55                        continue;
56                    }
57
58                    if let Some(addr) = peer.addr_if_seen() {
59                        return Poll::Ready(Some(*addr));
60                    }
61                }
62                DhtEvent::RoundEnded => break,
63            }
64        }
65
66        // Stop the lookup after one round.
67        self.request.take();
68
69        Poll::Ready(None)
70    }
71}