ouisync/network/dht/
lookup_stream.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll, ready},
4};
5
6use btdht::InfoHash;
7use futures_util::{Stream, StreamExt};
8use tokio::sync::mpsc;
9use tokio_stream::wrappers::UnboundedReceiverStream;
10
11use super::{super::PeerAddr, DhtDiscovery, DhtEvent, LookupRequest};
12
13pub struct DhtLookupStream {
15 request: Option<LookupRequest>,
16 event_rx: UnboundedReceiverStream<DhtEvent>,
17 allow_local: bool,
18}
19
20impl DhtLookupStream {
21 pub(in super::super) fn start(
22 dht: &DhtDiscovery,
23 info_hash: InfoHash,
24 announce: bool,
25 allow_local: bool,
26 ) -> Self {
27 let (peer_tx, peer_rx) = mpsc::unbounded_channel();
28 let request = dht.start_lookup(info_hash, announce, peer_tx);
29
30 Self {
31 request: Some(request),
32 event_rx: UnboundedReceiverStream::new(peer_rx),
33 allow_local,
34 }
35 }
36
37 pub fn empty() -> Self {
39 Self {
40 request: None,
41 event_rx: UnboundedReceiverStream::new(mpsc::unbounded_channel().1),
42 allow_local: false,
43 }
44 }
45}
46
47impl Stream for DhtLookupStream {
48 type Item = PeerAddr;
49
50 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51 while let Some(event) = ready!(self.event_rx.poll_next_unpin(cx)) {
52 match event {
53 DhtEvent::PeerFound(peer) => {
54 if !self.allow_local && peer.initial_addr().is_local() {
55 continue;
56 }
57
58 if let Some(addr) = peer.addr_if_seen() {
59 return Poll::Ready(Some(*addr));
60 }
61 }
62 DhtEvent::RoundEnded => break,
63 }
64 }
65
66 self.request.take();
68
69 Poll::Ready(None)
70 }
71}