ouisync/joint_directory/
mod.rs

1#[cfg(test)]
2mod tests;
3
4use crate::{
5    branch::Branch,
6    conflict,
7    crypto::sign::PublicKey,
8    directory::{
9        self, Directory, DirectoryFallback, DirectoryRef, EntryRef, EntryTombstoneData, EntryType,
10        FileRef,
11    },
12    error::{Error, Result},
13    file::File,
14    iterator::{Accumulate, SortedUnion},
15    store,
16    version_vector::VersionVector,
17    versioned::{self, PreferBranch},
18};
19use async_recursion::async_recursion;
20use camino::{Utf8Component, Utf8Path};
21use either::Either;
22use std::{
23    borrow::Cow,
24    collections::{BTreeMap, VecDeque},
25    fmt, iter, mem,
26};
27use tracing::{instrument, Instrument};
28
29/// Unified view over multiple concurrent versions of a directory.
30#[derive(Clone)]
31pub struct JointDirectory {
32    versions: BTreeMap<PublicKey, Directory>,
33    local_branch: Option<Branch>,
34}
35
36impl JointDirectory {
37    /// Creates a new `JointDirectory` over the specified directory versions.
38    ///
39    /// Note: if `local_branch` is `None` then the created joint directory is read-only.
40    pub fn new<I>(local_branch: Option<Branch>, versions: I) -> Self
41    where
42        I: IntoIterator<Item = Directory>,
43    {
44        let versions = versions
45            .into_iter()
46            .map(|dir| (*dir.branch().id(), dir))
47            .collect();
48
49        Self {
50            versions,
51            local_branch,
52        }
53    }
54
55    pub(crate) fn local_version(&self) -> Option<&Directory> {
56        self.local_branch
57            .as_ref()
58            .and_then(|branch| self.versions.get(branch.id()))
59    }
60
61    pub(crate) fn local_version_mut(&mut self) -> Option<&mut Directory> {
62        self.local_branch
63            .as_ref()
64            .and_then(|branch| self.versions.get_mut(branch.id()))
65    }
66
67    pub fn is_empty(&self) -> bool {
68        self.entries().next().is_none()
69    }
70
71    /// Returns iterator over the entries of this directory. Multiple concurrent versions of the
72    /// same file are returned as separate `JointEntryRef::File` entries. Multiple concurrent
73    /// versions of the same directory are returned as a single `JointEntryRef::Directory` entry.
74    pub fn entries(&self) -> impl Iterator<Item = JointEntryRef> {
75        self.merge_entries()
76            .flat_map(|(_, merge)| merge.ignore_tombstones())
77    }
78
79    fn merge_entries(&self) -> impl Iterator<Item = (&str, Merge)> {
80        let entries = self.versions.values().map(|directory| directory.entries());
81        let entries = SortedUnion::new(entries, |entry| entry.name());
82        let entries = Accumulate::new(entries, |entry| entry.name());
83        entries.map(|(name, entries)| {
84            (
85                name,
86                Merge::new(entries.into_iter(), self.local_branch.as_ref()),
87            )
88        })
89    }
90
91    /// Returns all versions of an entry with the given name. Concurrent file versions are returned
92    /// separately but concurrent directory versions are merged into a single `JointDirectory`.
93    pub fn lookup<'a>(&'a self, name: &'a str) -> impl Iterator<Item = JointEntryRef<'a>> + 'a {
94        Merge::new(
95            self.versions
96                .values()
97                .filter_map(move |dir| dir.lookup(name).ok()),
98            self.local_branch.as_ref(),
99        )
100        .ignore_tombstones()
101    }
102
103    /// Looks up single entry with the specified name if it is unique.
104    ///
105    /// - If there is only one version of a entry with the specified name, it is returned.
106    /// - If there are multiple versions and all of them are files, an `AmbiguousEntry` error is
107    ///   returned. To lookup a single version, include a disambiguator in the `name`.
108    /// - If there are multiple versiond and all of them are directories, they are merged into a
109    ///   single `JointEntryRef::Directory` and returned.
110    /// - Finally, if there are both files and directories, only the directories are retured (merged
111    ///   into a `JointEntryRef::Directory`) and the files are discarded. This is so it's possible
112    ///   to unambiguously lookup a directory even in the presence of conflicting files.
113    pub fn lookup_unique<'a>(&'a self, name: &'a str) -> Result<JointEntryRef<'a>> {
114        // First try exact match as it is more common.
115        let mut entries =
116            Merge::new(self.entry_versions(name), self.local_branch.as_ref()).ignore_tombstones();
117        if let Some(entry) = entries.next() {
118            if entries.next().is_none() {
119                return Ok(entry);
120            } else {
121                return Err(Error::AmbiguousEntry);
122            }
123        }
124
125        // If not found, extract the disambiguator and try to lookup an entry whose branch id
126        // matches it.
127        let (name, branch_id_prefix) = conflict::parse_unique_name(name);
128        let branch_id_prefix = branch_id_prefix.ok_or(Error::EntryNotFound)?;
129
130        let mut entries = Merge::new(self.entry_versions(name), self.local_branch.as_ref())
131            .ignore_tombstones()
132            .filter(|entry| entry.first_branch().id().starts_with(&branch_id_prefix));
133
134        let first = entries.next().ok_or(Error::EntryNotFound)?;
135
136        if entries.next().is_none() {
137            Ok(first)
138        } else {
139            Err(Error::AmbiguousEntry)
140        }
141    }
142
143    /// Looks up a specific version of a file.
144    #[instrument(skip(self), err(Debug))]
145    pub fn lookup_version(&self, name: &'_ str, branch_id: &'_ PublicKey) -> Result<FileRef> {
146        self.versions
147            .get(branch_id)
148            .ok_or(Error::EntryNotFound)
149            .and_then(|dir| dir.lookup(name))
150            .and_then(|entry| entry.file())
151    }
152
153    /// Length of the directory in bytes. If there are multiple versions, returns the sum of their
154    /// lengths.
155    #[allow(clippy::len_without_is_empty)]
156    pub fn len(&self) -> u64 {
157        self.versions.values().map(|dir| dir.len()).sum()
158    }
159
160    pub fn has_local_version(&self) -> bool {
161        self.local_branch
162            .as_ref()
163            .map(|local_branch| self.versions.contains_key(local_branch.id()))
164            .unwrap_or(false)
165    }
166
167    /// Descends into an arbitrarily nested subdirectory of this directory at the specified path.
168    /// Note: non-normalized paths (i.e. containing "..") or Windows-style drive prefixes
169    /// (e.g. "C:") are not supported.
170    pub async fn cd(&self, path: impl AsRef<Utf8Path>) -> Result<Self> {
171        let mut curr = Cow::Borrowed(self);
172
173        for component in path.as_ref().components() {
174            match component {
175                Utf8Component::RootDir | Utf8Component::CurDir => (),
176                Utf8Component::Normal(name) => {
177                    let next = curr
178                        .lookup(name)
179                        .find_map(|entry| entry.directory().ok())
180                        .ok_or(Error::EntryNotFound)?
181                        .open()
182                        .await?;
183                    curr = Cow::Owned(next);
184                }
185                Utf8Component::ParentDir | Utf8Component::Prefix(_) => {
186                    return Err(Error::OperationNotSupported)
187                }
188            }
189        }
190
191        Ok(curr.into_owned())
192    }
193
194    /// Removes the specified entry from this directory. If the entry is a subdirectory, it has to
195    /// be empty. Use [Self::remove_entry_recursively] to remove non-empty subdirectories.
196    pub async fn remove_entry(&mut self, name: &str) -> Result<()> {
197        self.remove_entries(Pattern::Unique(name)).await
198    }
199
200    /// Removes the specified entry from this directory, including all its content if it is a
201    /// subdirectory.
202    pub async fn remove_entry_recursively(&mut self, name: &str) -> Result<()> {
203        self.remove_entries_recursively(Pattern::Unique(name)).await
204    }
205
206    async fn remove_entries(&mut self, pattern: Pattern<'_>) -> Result<()> {
207        let local_branch = self.local_branch.as_ref().ok_or(Error::PermissionDenied)?;
208
209        let entries: Vec<_> = pattern
210            .apply(self)?
211            .map(|entry| {
212                let name = entry.name().to_owned();
213                let branch_id = match &entry {
214                    JointEntryRef::File(entry) => *entry.branch().id(),
215                    JointEntryRef::Directory(_) => *local_branch.id(),
216                };
217                let vv = entry.version_vector().into_owned();
218
219                (name, branch_id, vv)
220            })
221            .collect();
222
223        let local_version = self.fork().await?;
224
225        for (name, branch_id, vv) in entries {
226            local_version.remove_entry(&name, &branch_id, vv).await?;
227        }
228
229        Ok(())
230    }
231
232    #[async_recursion]
233    async fn remove_entries_recursively<'a>(&'a mut self, pattern: Pattern<'a>) -> Result<()> {
234        for entry in pattern.apply(self)?.filter_map(|e| e.directory().ok()) {
235            let mut dir = entry
236                .open_with(MissingVersionStrategy::Skip, DirectoryFallback::Disabled)
237                .await?;
238            dir.remove_entries_recursively(Pattern::All).await?;
239        }
240
241        if let Some(local_version) = self.local_version_mut() {
242            local_version.refresh().await?;
243        }
244
245        self.remove_entries(pattern).await
246    }
247
248    /// Merge all versions of this `JointDirectory` into a single `Directory`.
249    ///
250    /// In the presence of conflicts (multiple concurrent versions of the same file) this function
251    /// still proceeds as far as it can, but the conflicting files remain unmerged. It signals this
252    /// by returning `Error::AmbiguousEntry`.
253    #[async_recursion]
254    pub async fn merge(&mut self) -> Result<Directory> {
255        let old_version_vector = if let Some(local_version) = self.local_version() {
256            local_version.version_vector().await?
257        } else {
258            VersionVector::new()
259        };
260
261        let new_version_vector = self.merge_version_vectors().await?;
262
263        if !old_version_vector.is_empty() && old_version_vector >= new_version_vector {
264            // Local version already up to date, nothing to do.
265            tracing::trace!(old = ?old_version_vector, "Merge not started - already up to date");
266            // unwrap is ok because if old_version_vector is non-empty it means the local version
267            // must exist.
268            return Ok(self.local_version().unwrap().clone());
269        } else {
270            tracing::trace!(old = ?old_version_vector, new = ?new_version_vector, "Merge started");
271        }
272
273        let local_version = self.fork().await?;
274        let local_branch = local_version.branch().clone();
275
276        let mut conflict = false;
277        let mut check_for_removal = Vec::new();
278
279        for (name, merge) in self.merge_entries() {
280            match merge {
281                Merge::Existing(existing) => {
282                    for entry in existing {
283                        match entry {
284                            JointEntryRef::File(entry) => {
285                                match entry.fork(&local_branch).await {
286                                    Ok(()) => {}
287                                    Err(Error::EntryExists) => {
288                                        // This error indicates the local and the remote files are in conflict and
289                                        // so can't be automatically merged. We still proceed with merging the
290                                        // remaining entries but we won't mark this directory as merged (by bumping its
291                                        // vv) to prevent the conflicting remote file from being collected.
292                                        conflict = true;
293                                    }
294                                    Err(error) => return Err(error),
295                                }
296                            }
297                            JointEntryRef::Directory(entry) => {
298                                let mut dir = entry
299                                    .open_with(
300                                        MissingVersionStrategy::Fail,
301                                        DirectoryFallback::Disabled,
302                                    )
303                                    .await?;
304                                match dir
305                                    .merge()
306                                    .instrument(tracing::info_span!("dir", message = name))
307                                    .await
308                                {
309                                    Ok(_) => (),
310                                    Err(Error::AmbiguousEntry) => {
311                                        conflict = true;
312                                    }
313                                    Err(error) => return Err(error),
314                                }
315                            }
316                        }
317                    }
318                }
319                Merge::Tombstone(tombstone) => {
320                    check_for_removal.push((name.to_owned(), tombstone));
321                }
322            }
323        }
324
325        // unwrap is ok because we ensured the local version exists by calling `fork` at the
326        // beginning of this function.
327        let local_version = self.local_version_mut().unwrap();
328        local_version.refresh().await?;
329
330        for (name, tombstone) in check_for_removal {
331            local_version.create_tombstone(&name, tombstone).await?;
332        }
333
334        // Need to bump the root version vector to reflect any non-filesystem changes (e.g.,
335        // removal of nodes during garbage collection).
336        if !conflict && local_version.is_root() {
337            directory::bump_root(&local_branch, new_version_vector).await?;
338        }
339
340        if tracing::enabled!(tracing::Level::TRACE) {
341            let vv = local_version.version_vector().await?;
342            tracing::trace!(?vv, ?conflict, "Merge completed");
343        }
344
345        if conflict {
346            Err(Error::AmbiguousEntry)
347        } else {
348            Ok(local_version.clone())
349        }
350    }
351
352    // Merge the version vectors of all the versions in this joint directory.
353    async fn merge_version_vectors(&self) -> Result<VersionVector> {
354        let mut outcome = VersionVector::new();
355
356        for version in self.versions.values() {
357            outcome.merge(&version.version_vector().await?);
358        }
359
360        Ok(outcome)
361    }
362
363    async fn fork(&mut self) -> Result<&mut Directory> {
364        let local_branch = self.local_branch.as_ref().ok_or(Error::PermissionDenied)?;
365
366        let mut local_version = None;
367
368        // Need to `fork` from each branch individually so that the local version vector is
369        // properly updated.
370        for (branch_id, version) in &self.versions {
371            if branch_id == local_branch.id() {
372                continue;
373            }
374
375            local_version = Some(version.fork(local_branch).await?);
376        }
377
378        if let Some(local_version) = local_version {
379            self.versions.insert(*local_branch.id(), local_version);
380        }
381
382        // TODO: This can return error only if this `JointDirectory` contains no versions which should
383        // never happen. Consider making it an invariant and doing `unwrap` / `expect` here instead.
384        self.versions
385            .get_mut(local_branch.id())
386            .ok_or(Error::EntryNotFound)
387    }
388
389    fn entry_versions<'a>(&'a self, name: &'a str) -> impl Iterator<Item = EntryRef<'a>> {
390        self.versions
391            .values()
392            .filter_map(move |v| v.lookup(name).ok())
393    }
394}
395
396impl fmt::Debug for JointDirectory {
397    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
398        f.debug_struct("JointDirectory").finish()
399    }
400}
401
402#[derive(Debug)]
403pub enum JointEntryRef<'a> {
404    File(JointFileRef<'a>),
405    Directory(JointDirectoryRef<'a>),
406}
407
408impl<'a> JointEntryRef<'a> {
409    pub fn name(&self) -> &'a str {
410        match self {
411            Self::File(r) => r.name(),
412            Self::Directory(r) => r.name(),
413        }
414    }
415
416    pub fn unique_name(&self) -> Cow<'a, str> {
417        match self {
418            Self::File(r) => r.unique_name(),
419            Self::Directory(r) => r.unique_name(),
420        }
421    }
422
423    pub fn entry_type(&self) -> EntryType {
424        match self {
425            Self::File(_) => EntryType::File,
426            Self::Directory(_) => EntryType::Directory,
427        }
428    }
429
430    pub fn version_vector(&'a self) -> Cow<'a, VersionVector> {
431        match self {
432            Self::File(r) => Cow::Borrowed(r.version_vector()),
433            Self::Directory(r) => Cow::Owned(r.version_vector()),
434        }
435    }
436
437    pub fn file(self) -> Result<FileRef<'a>> {
438        match self {
439            Self::File(r) => Ok(r.file),
440            Self::Directory(_) => Err(Error::EntryIsDirectory),
441        }
442    }
443
444    pub fn directory(self) -> Result<JointDirectoryRef<'a>> {
445        match self {
446            Self::Directory(r) => Ok(r),
447            Self::File(_) => Err(Error::EntryIsFile),
448        }
449    }
450
451    fn first_branch(&self) -> &Branch {
452        match self {
453            Self::File(r) => r.branch(),
454            Self::Directory(r) => r.first_version().branch(),
455        }
456    }
457}
458
459#[derive(Debug)]
460pub struct JointFileRef<'a> {
461    file: FileRef<'a>,
462    needs_disambiguation: bool,
463}
464
465impl<'a> JointFileRef<'a> {
466    pub fn name(&self) -> &'a str {
467        self.file.name()
468    }
469
470    pub fn unique_name(&self) -> Cow<'a, str> {
471        if self.needs_disambiguation {
472            Cow::from(conflict::create_unique_name(
473                self.name(),
474                self.file.branch().id(),
475            ))
476        } else {
477            Cow::from(self.name())
478        }
479    }
480
481    pub async fn open(&self) -> Result<File> {
482        self.file.open().await
483    }
484
485    pub(crate) async fn fork(&self, dst_branch: &Branch) -> Result<()> {
486        self.file.fork(dst_branch).await
487    }
488
489    pub fn version_vector(&self) -> &'a VersionVector {
490        self.file.version_vector()
491    }
492
493    pub fn branch(&self) -> &Branch {
494        self.file.branch()
495    }
496
497    pub fn parent(&self) -> &Directory {
498        self.file.parent()
499    }
500
501    pub fn inner(&self) -> FileRef<'a> {
502        self.file
503    }
504}
505
506pub struct JointDirectoryRef<'a> {
507    versions: Vec<DirectoryRef<'a>>,
508    local_branch: Option<&'a Branch>,
509    needs_disambiguation: bool,
510}
511
512impl<'a> JointDirectoryRef<'a> {
513    fn new(
514        versions: Vec<DirectoryRef<'a>>,
515        local_branch: Option<&'a Branch>,
516        needs_disambiguation: bool,
517    ) -> Option<Self> {
518        if versions.is_empty() {
519            None
520        } else {
521            Some(Self {
522                versions,
523                local_branch,
524                needs_disambiguation,
525            })
526        }
527    }
528
529    pub fn name(&self) -> &'a str {
530        self.first_version().name()
531    }
532
533    pub fn unique_name(&self) -> Cow<'a, str> {
534        if self.needs_disambiguation {
535            Cow::from(conflict::create_unique_name(
536                self.name(),
537                self.first_version().branch().id(),
538            ))
539        } else {
540            Cow::from(self.name())
541        }
542    }
543
544    pub fn version_vector(&self) -> VersionVector {
545        self.versions
546            .iter()
547            .fold(VersionVector::new(), |mut vv, dir| {
548                vv.merge(dir.version_vector());
549                vv
550            })
551    }
552
553    pub async fn open(&self) -> Result<JointDirectory> {
554        self.open_with(MissingVersionStrategy::Skip, DirectoryFallback::Enabled)
555            .await
556    }
557
558    pub(crate) async fn open_with(
559        &self,
560        missing_version_strategy: MissingVersionStrategy,
561        fallback: DirectoryFallback,
562    ) -> Result<JointDirectory> {
563        let mut versions = Vec::new();
564        for version in &self.versions {
565            match version.open(fallback).await {
566                Ok(open_dir) => versions.push(open_dir),
567                Err(e)
568                    if self
569                        .local_branch
570                        .map(|local_branch| version.branch().id() == local_branch.id())
571                        .unwrap_or(false) =>
572                {
573                    return Err(e)
574                }
575                Err(Error::Store(store::Error::BlockNotFound))
576                    if matches!(missing_version_strategy, MissingVersionStrategy::Skip) =>
577                {
578                    // Some of the directories on remote branches may fail due to them not yet
579                    // being fully downloaded from remote peers. This is OK and we'll treat such
580                    // cases as if this replica doesn't know about those directories.
581                    continue;
582                }
583                Err(e) => return Err(e),
584            }
585        }
586
587        Ok(JointDirectory::new(self.local_branch.cloned(), versions))
588    }
589
590    pub(crate) fn versions(&self) -> &[DirectoryRef] {
591        &self.versions
592    }
593
594    fn first_version(&self) -> &DirectoryRef<'a> {
595        self.versions
596            .first()
597            .expect("joint directory must contain at least one directory")
598    }
599}
600
601impl fmt::Debug for JointDirectoryRef<'_> {
602    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
603        f.debug_struct("JointDirectoryRef")
604            .field("name", &self.name())
605            .finish()
606    }
607}
608
609/// How to handle opening a joint directory that has some versions that are not fully loaded yet.
610#[derive(Copy, Clone)]
611pub enum MissingVersionStrategy {
612    /// Ignore the missing versions
613    Skip,
614    /// Fail the whole open operation
615    Fail,
616}
617
618// Iterator adaptor that maps iterator of `EntryRef` to iterator of `JointEntryRef` by filtering
619// out the outdated (according the their version vectors) versions and then merging all
620// `EntryRef::Directory` items into a single `JointDirectoryRef` item but keeping `EntryRef::File`
621// items separate.
622#[derive(Clone)]
623enum Merge<'a> {
624    // zero or more versions of an entry...
625    Existing(Existing<'a>),
626    // ...or a single tombstone
627    Tombstone(EntryTombstoneData),
628}
629
630#[derive(Default, Clone)]
631struct Existing<'a> {
632    // TODO: The most common case for files shall be that there will be only one version of it.
633    // Thus it might make sense to have one place holder for the first file to avoid Vec allocation
634    // when not needed.
635    files: VecDeque<FileRef<'a>>,
636    directories: Vec<DirectoryRef<'a>>,
637    needs_disambiguation: bool,
638    local_branch: Option<&'a Branch>,
639}
640
641impl<'a> Iterator for Existing<'a> {
642    type Item = JointEntryRef<'a>;
643
644    fn next(&mut self) -> Option<Self::Item> {
645        if let Some(dir) = JointDirectoryRef::new(
646            mem::take(&mut self.directories),
647            self.local_branch,
648            self.needs_disambiguation,
649        ) {
650            return Some(JointEntryRef::Directory(dir));
651        }
652
653        Some(JointEntryRef::File(JointFileRef {
654            file: self.files.pop_front()?,
655            needs_disambiguation: self.needs_disambiguation,
656        }))
657    }
658}
659
660impl<'a> Merge<'a> {
661    // All these entries are expected to have the same name. They can be either files, directories
662    // or a mix of the two.
663    fn new<I>(entries: I, local_branch: Option<&'a Branch>) -> Self
664    where
665        I: Iterator<Item = EntryRef<'a>>,
666    {
667        let mut files = VecDeque::new();
668        let mut directories = vec![];
669        let mut tombstone: Option<EntryTombstoneData> = None;
670
671        // Note that doing this will remove files that have been removed by tombstones as well.
672        let entries = versioned::keep_maximal(entries, PreferBranch(local_branch.map(Branch::id)));
673
674        for entry in entries {
675            match entry {
676                EntryRef::File(file) => files.push_back(file),
677                EntryRef::Directory(dir) => directories.push(dir),
678                EntryRef::Tombstone(_) if !files.is_empty() || !directories.is_empty() => continue,
679                EntryRef::Tombstone(new_tombstone) => {
680                    let new_tombstone = if let Some(mut old_tombstone) = tombstone.take() {
681                        old_tombstone.merge(new_tombstone.data());
682                        old_tombstone
683                    } else {
684                        new_tombstone.data().clone()
685                    };
686
687                    tombstone = Some(new_tombstone);
688                }
689            }
690        }
691
692        let needs_disambiguation = files.len() + if directories.is_empty() { 0 } else { 1 } > 1;
693
694        match tombstone {
695            Some(tombstone) if files.is_empty() && directories.is_empty() => {
696                Self::Tombstone(tombstone)
697            }
698            Some(_) | None => Self::Existing(Existing {
699                files,
700                directories,
701                needs_disambiguation,
702                local_branch,
703            }),
704        }
705    }
706
707    fn ignore_tombstones(self) -> Existing<'a> {
708        match self {
709            Self::Existing(existing) => existing,
710            Self::Tombstone(_) => Existing::default(),
711        }
712    }
713}
714
715enum Pattern<'a> {
716    // Fetch all entries
717    All,
718    // Fetch single entry that matches the given unique name
719    Unique(&'a str),
720}
721
722impl<'a> Pattern<'a> {
723    fn apply(&self, dir: &'a JointDirectory) -> Result<impl Iterator<Item = JointEntryRef<'a>>> {
724        match self {
725            Self::All => Ok(Either::Left(dir.entries())),
726            Self::Unique(name) => dir
727                .lookup_unique(name)
728                .map(|entry| Either::Right(iter::once(entry))),
729        }
730    }
731}