ouisync/joint_directory/
mod.rs

1#[cfg(test)]
2mod tests;
3
4use crate::{
5    branch::Branch,
6    conflict,
7    crypto::sign::PublicKey,
8    directory::{
9        self, Directory, DirectoryFallback, DirectoryRef, EntryRef, EntryTombstoneData, EntryType,
10        FileRef,
11    },
12    error::{Error, Result},
13    file::File,
14    iterator::{Accumulate, SortedUnion},
15    store,
16    version_vector::VersionVector,
17    versioned::{self, PreferBranch},
18};
19use async_recursion::async_recursion;
20use camino::{Utf8Component, Utf8Path};
21use either::Either;
22use std::{
23    borrow::Cow,
24    collections::{BTreeMap, VecDeque},
25    fmt, iter, mem,
26};
27use tracing::{instrument, Instrument};
28
29/// Unified view over multiple concurrent versions of a directory.
30#[derive(Clone)]
31pub struct JointDirectory {
32    versions: BTreeMap<PublicKey, Directory>,
33    local_branch: Option<Branch>,
34}
35
36impl JointDirectory {
37    /// Creates a new `JointDirectory` over the specified directory versions.
38    ///
39    /// Note: if `local_branch` is `None` then the created joint directory is read-only.
40    pub fn new<I>(local_branch: Option<Branch>, versions: I) -> Self
41    where
42        I: IntoIterator<Item = Directory>,
43    {
44        let versions = versions
45            .into_iter()
46            .map(|dir| (*dir.branch().id(), dir))
47            .collect();
48
49        Self {
50            versions,
51            local_branch,
52        }
53    }
54
55    pub(crate) fn local_version(&self) -> Option<&Directory> {
56        self.local_branch
57            .as_ref()
58            .and_then(|branch| self.versions.get(branch.id()))
59    }
60
61    pub(crate) fn local_version_mut(&mut self) -> Option<&mut Directory> {
62        self.local_branch
63            .as_ref()
64            .and_then(|branch| self.versions.get_mut(branch.id()))
65    }
66
67    pub fn is_empty(&self) -> bool {
68        self.entries().next().is_none()
69    }
70
71    /// Returns iterator over the entries of this directory. Multiple concurrent versions of the
72    /// same file are returned as separate `JointEntryRef::File` entries. Multiple concurrent
73    /// versions of the same directory are returned as a single `JointEntryRef::Directory` entry.
74    pub fn entries(&self) -> impl Iterator<Item = JointEntryRef<'_>> {
75        self.merge_entries()
76            .flat_map(|(_, merge)| merge.ignore_tombstones())
77    }
78
79    fn merge_entries(&self) -> impl Iterator<Item = (&str, Merge<'_>)> {
80        let entries = self.versions.values().map(|directory| directory.entries());
81        let entries = SortedUnion::new(entries, |entry| entry.name());
82        let entries = Accumulate::new(entries, |entry| entry.name());
83        entries.map(|(name, entries)| {
84            (
85                name,
86                Merge::new(entries.into_iter(), self.local_branch.as_ref()),
87            )
88        })
89    }
90
91    /// Returns all versions of an entry with the given name. Concurrent file versions are returned
92    /// separately but concurrent directory versions are merged into a single `JointDirectory`.
93    pub fn lookup<'a>(&'a self, name: &'a str) -> impl Iterator<Item = JointEntryRef<'a>> + 'a {
94        Merge::new(
95            self.versions
96                .values()
97                .filter_map(move |dir| dir.lookup(name).ok()),
98            self.local_branch.as_ref(),
99        )
100        .ignore_tombstones()
101    }
102
103    /// Looks up single entry with the specified name if it is unique.
104    ///
105    /// - If there is only one version of a entry with the specified name, it is returned.
106    /// - If there are multiple versions and all of them are files, an `AmbiguousEntry` error is
107    ///   returned. To lookup a single version, include a disambiguator in the `name`.
108    /// - If there are multiple versiond and all of them are directories, they are merged into a
109    ///   single `JointEntryRef::Directory` and returned.
110    /// - Finally, if there are both files and directories, only the directories are retured (merged
111    ///   into a `JointEntryRef::Directory`) and the files are discarded. This is so it's possible
112    ///   to unambiguously lookup a directory even in the presence of conflicting files.
113    pub fn lookup_unique<'a>(&'a self, name: &'a str) -> Result<JointEntryRef<'a>> {
114        // First try exact match as it is more common.
115        let mut entries =
116            Merge::new(self.entry_versions(name), self.local_branch.as_ref()).ignore_tombstones();
117        if let Some(entry) = entries.next() {
118            if entries.next().is_none() {
119                return Ok(entry);
120            } else {
121                return Err(Error::AmbiguousEntry);
122            }
123        }
124
125        // If not found, extract the disambiguator and try to lookup an entry whose branch id
126        // matches it.
127        let (name, branch_id_prefix) = conflict::parse_unique_name(name);
128        let branch_id_prefix = branch_id_prefix.ok_or(Error::EntryNotFound)?;
129
130        let mut entries = Merge::new(self.entry_versions(name), self.local_branch.as_ref())
131            .ignore_tombstones()
132            .filter(|entry| entry.first_branch().id().starts_with(&branch_id_prefix));
133
134        let first = entries.next().ok_or(Error::EntryNotFound)?;
135
136        if entries.next().is_none() {
137            Ok(first)
138        } else {
139            Err(Error::AmbiguousEntry)
140        }
141    }
142
143    /// Looks up a specific version of a file.
144    #[instrument(skip(self), err(Debug))]
145    pub fn lookup_version<'a>(
146        &'a self,
147        name: &'a str,
148        branch_id: &'a PublicKey,
149    ) -> Result<FileRef<'a>> {
150        self.versions
151            .get(branch_id)
152            .ok_or(Error::EntryNotFound)
153            .and_then(|dir| dir.lookup(name))
154            .and_then(|entry| entry.file())
155    }
156
157    /// Length of the directory in bytes. If there are multiple versions, returns the sum of their
158    /// lengths.
159    #[allow(clippy::len_without_is_empty)]
160    pub fn len(&self) -> u64 {
161        self.versions.values().map(|dir| dir.len()).sum()
162    }
163
164    pub fn has_local_version(&self) -> bool {
165        self.local_branch
166            .as_ref()
167            .map(|local_branch| self.versions.contains_key(local_branch.id()))
168            .unwrap_or(false)
169    }
170
171    /// Descends into an arbitrarily nested subdirectory of this directory at the specified path.
172    /// Note: non-normalized paths (i.e. containing "..") or Windows-style drive prefixes
173    /// (e.g. "C:") are not supported.
174    pub async fn cd(&self, path: impl AsRef<Utf8Path>) -> Result<Self> {
175        let mut curr = Cow::Borrowed(self);
176
177        for component in path.as_ref().components() {
178            match component {
179                Utf8Component::RootDir | Utf8Component::CurDir => (),
180                Utf8Component::Normal(name) => {
181                    let next = curr
182                        .lookup(name)
183                        .find_map(|entry| entry.directory().ok())
184                        .ok_or(Error::EntryNotFound)?
185                        .open()
186                        .await?;
187                    curr = Cow::Owned(next);
188                }
189                Utf8Component::ParentDir | Utf8Component::Prefix(_) => {
190                    return Err(Error::OperationNotSupported)
191                }
192            }
193        }
194
195        Ok(curr.into_owned())
196    }
197
198    /// Removes the specified entry from this directory. If the entry is a subdirectory, it has to
199    /// be empty. Use [Self::remove_entry_recursively] to remove non-empty subdirectories.
200    pub async fn remove_entry(&mut self, name: &str) -> Result<()> {
201        self.remove_entries(Pattern::Unique(name)).await
202    }
203
204    /// Removes the specified entry from this directory, including all its content if it is a
205    /// subdirectory.
206    pub async fn remove_entry_recursively(&mut self, name: &str) -> Result<()> {
207        self.remove_entries_recursively(Pattern::Unique(name)).await
208    }
209
210    async fn remove_entries(&mut self, pattern: Pattern<'_>) -> Result<()> {
211        let local_branch = self.local_branch.as_ref().ok_or(Error::PermissionDenied)?;
212
213        let entries: Vec<_> = pattern
214            .apply(self)?
215            .map(|entry| {
216                let name = entry.name().to_owned();
217                let branch_id = match &entry {
218                    JointEntryRef::File(entry) => *entry.branch().id(),
219                    JointEntryRef::Directory(_) => *local_branch.id(),
220                };
221                let vv = entry.version_vector().into_owned();
222
223                (name, branch_id, vv)
224            })
225            .collect();
226
227        let local_version = self.fork().await?;
228
229        for (name, branch_id, vv) in entries {
230            local_version.remove_entry(&name, &branch_id, vv).await?;
231        }
232
233        Ok(())
234    }
235
236    #[async_recursion]
237    async fn remove_entries_recursively<'a>(&'a mut self, pattern: Pattern<'a>) -> Result<()> {
238        for entry in pattern.apply(self)?.filter_map(|e| e.directory().ok()) {
239            let mut dir = entry
240                .open_with(MissingVersionStrategy::Skip, DirectoryFallback::Disabled)
241                .await?;
242            dir.remove_entries_recursively(Pattern::All).await?;
243        }
244
245        if let Some(local_version) = self.local_version_mut() {
246            local_version.refresh().await?;
247        }
248
249        self.remove_entries(pattern).await
250    }
251
252    /// Merge all versions of this `JointDirectory` into a single `Directory`.
253    ///
254    /// In the presence of conflicts (multiple concurrent versions of the same file) this function
255    /// still proceeds as far as it can, but the conflicting files remain unmerged. It signals this
256    /// by returning `Error::AmbiguousEntry`.
257    #[async_recursion]
258    pub async fn merge(&mut self) -> Result<Directory> {
259        let old_version_vector = if let Some(local_version) = self.local_version() {
260            local_version.version_vector().await?
261        } else {
262            VersionVector::new()
263        };
264
265        let new_version_vector = self.merge_version_vectors().await?;
266
267        if !old_version_vector.is_empty() && old_version_vector >= new_version_vector {
268            // Local version already up to date, nothing to do.
269            tracing::trace!(old = ?old_version_vector, "Merge not started - already up to date");
270            // unwrap is ok because if old_version_vector is non-empty it means the local version
271            // must exist.
272            return Ok(self.local_version().unwrap().clone());
273        } else {
274            tracing::trace!(old = ?old_version_vector, new = ?new_version_vector, "Merge started");
275        }
276
277        let local_version = self.fork().await?;
278        let local_branch = local_version.branch().clone();
279
280        let mut conflict = false;
281        let mut check_for_removal = Vec::new();
282
283        for (name, merge) in self.merge_entries() {
284            match merge {
285                Merge::Existing(existing) => {
286                    for entry in existing {
287                        match entry {
288                            JointEntryRef::File(entry) => {
289                                match entry.fork(&local_branch).await {
290                                    Ok(()) => {}
291                                    Err(Error::EntryExists) => {
292                                        // This error indicates the local and the remote files are in conflict and
293                                        // so can't be automatically merged. We still proceed with merging the
294                                        // remaining entries but we won't mark this directory as merged (by bumping its
295                                        // vv) to prevent the conflicting remote file from being collected.
296                                        conflict = true;
297                                    }
298                                    Err(error) => return Err(error),
299                                }
300                            }
301                            JointEntryRef::Directory(entry) => {
302                                let mut dir = entry
303                                    .open_with(
304                                        MissingVersionStrategy::Fail,
305                                        DirectoryFallback::Disabled,
306                                    )
307                                    .await?;
308                                match dir
309                                    .merge()
310                                    .instrument(tracing::info_span!("dir", message = name))
311                                    .await
312                                {
313                                    Ok(_) => (),
314                                    Err(Error::AmbiguousEntry) => {
315                                        conflict = true;
316                                    }
317                                    Err(error) => return Err(error),
318                                }
319                            }
320                        }
321                    }
322                }
323                Merge::Tombstone(tombstone) => {
324                    check_for_removal.push((name.to_owned(), tombstone));
325                }
326            }
327        }
328
329        // unwrap is ok because we ensured the local version exists by calling `fork` at the
330        // beginning of this function.
331        let local_version = self.local_version_mut().unwrap();
332        local_version.refresh().await?;
333
334        for (name, tombstone) in check_for_removal {
335            local_version.create_tombstone(&name, tombstone).await?;
336        }
337
338        // Need to bump the root version vector to reflect any non-filesystem changes (e.g.,
339        // removal of nodes during garbage collection).
340        if !conflict && local_version.is_root() {
341            directory::bump_root(&local_branch, new_version_vector).await?;
342        }
343
344        if tracing::enabled!(tracing::Level::TRACE) {
345            let vv = local_version.version_vector().await?;
346            tracing::trace!(?vv, ?conflict, "Merge completed");
347        }
348
349        if conflict {
350            Err(Error::AmbiguousEntry)
351        } else {
352            Ok(local_version.clone())
353        }
354    }
355
356    // Merge the version vectors of all the versions in this joint directory.
357    async fn merge_version_vectors(&self) -> Result<VersionVector> {
358        let mut outcome = VersionVector::new();
359
360        for version in self.versions.values() {
361            outcome.merge(&version.version_vector().await?);
362        }
363
364        Ok(outcome)
365    }
366
367    async fn fork(&mut self) -> Result<&mut Directory> {
368        let local_branch = self.local_branch.as_ref().ok_or(Error::PermissionDenied)?;
369
370        let mut local_version = None;
371
372        // Need to `fork` from each branch individually so that the local version vector is
373        // properly updated.
374        for (branch_id, version) in &self.versions {
375            if branch_id == local_branch.id() {
376                continue;
377            }
378
379            local_version = Some(version.fork(local_branch).await?);
380        }
381
382        if let Some(local_version) = local_version {
383            self.versions.insert(*local_branch.id(), local_version);
384        }
385
386        // TODO: This can return error only if this `JointDirectory` contains no versions which should
387        // never happen. Consider making it an invariant and doing `unwrap` / `expect` here instead.
388        self.versions
389            .get_mut(local_branch.id())
390            .ok_or(Error::EntryNotFound)
391    }
392
393    fn entry_versions<'a>(&'a self, name: &'a str) -> impl Iterator<Item = EntryRef<'a>> {
394        self.versions
395            .values()
396            .filter_map(move |v| v.lookup(name).ok())
397    }
398}
399
400impl fmt::Debug for JointDirectory {
401    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
402        f.debug_struct("JointDirectory").finish()
403    }
404}
405
406#[derive(Debug)]
407pub enum JointEntryRef<'a> {
408    File(JointFileRef<'a>),
409    Directory(JointDirectoryRef<'a>),
410}
411
412impl<'a> JointEntryRef<'a> {
413    pub fn name(&self) -> &'a str {
414        match self {
415            Self::File(r) => r.name(),
416            Self::Directory(r) => r.name(),
417        }
418    }
419
420    pub fn unique_name(&self) -> Cow<'a, str> {
421        match self {
422            Self::File(r) => r.unique_name(),
423            Self::Directory(r) => r.unique_name(),
424        }
425    }
426
427    pub fn entry_type(&self) -> EntryType {
428        match self {
429            Self::File(_) => EntryType::File,
430            Self::Directory(_) => EntryType::Directory,
431        }
432    }
433
434    pub fn version_vector(&'a self) -> Cow<'a, VersionVector> {
435        match self {
436            Self::File(r) => Cow::Borrowed(r.version_vector()),
437            Self::Directory(r) => Cow::Owned(r.version_vector()),
438        }
439    }
440
441    pub fn file(self) -> Result<FileRef<'a>> {
442        match self {
443            Self::File(r) => Ok(r.file),
444            Self::Directory(_) => Err(Error::EntryIsDirectory),
445        }
446    }
447
448    pub fn directory(self) -> Result<JointDirectoryRef<'a>> {
449        match self {
450            Self::Directory(r) => Ok(r),
451            Self::File(_) => Err(Error::EntryIsFile),
452        }
453    }
454
455    fn first_branch(&self) -> &Branch {
456        match self {
457            Self::File(r) => r.branch(),
458            Self::Directory(r) => r.first_version().branch(),
459        }
460    }
461}
462
463#[derive(Debug)]
464pub struct JointFileRef<'a> {
465    file: FileRef<'a>,
466    needs_disambiguation: bool,
467}
468
469impl<'a> JointFileRef<'a> {
470    pub fn name(&self) -> &'a str {
471        self.file.name()
472    }
473
474    pub fn unique_name(&self) -> Cow<'a, str> {
475        if self.needs_disambiguation {
476            Cow::from(conflict::create_unique_name(
477                self.name(),
478                self.file.branch().id(),
479            ))
480        } else {
481            Cow::from(self.name())
482        }
483    }
484
485    pub async fn open(&self) -> Result<File> {
486        self.file.open().await
487    }
488
489    pub(crate) async fn fork(&self, dst_branch: &Branch) -> Result<()> {
490        self.file.fork(dst_branch).await
491    }
492
493    pub fn version_vector(&self) -> &'a VersionVector {
494        self.file.version_vector()
495    }
496
497    pub fn branch(&self) -> &Branch {
498        self.file.branch()
499    }
500
501    pub fn parent(&self) -> &Directory {
502        self.file.parent()
503    }
504
505    pub fn inner(&self) -> FileRef<'a> {
506        self.file
507    }
508}
509
510pub struct JointDirectoryRef<'a> {
511    versions: Vec<DirectoryRef<'a>>,
512    local_branch: Option<&'a Branch>,
513    needs_disambiguation: bool,
514}
515
516impl<'a> JointDirectoryRef<'a> {
517    fn new(
518        versions: Vec<DirectoryRef<'a>>,
519        local_branch: Option<&'a Branch>,
520        needs_disambiguation: bool,
521    ) -> Option<Self> {
522        if versions.is_empty() {
523            None
524        } else {
525            Some(Self {
526                versions,
527                local_branch,
528                needs_disambiguation,
529            })
530        }
531    }
532
533    pub fn name(&self) -> &'a str {
534        self.first_version().name()
535    }
536
537    pub fn unique_name(&self) -> Cow<'a, str> {
538        if self.needs_disambiguation {
539            Cow::from(conflict::create_unique_name(
540                self.name(),
541                self.first_version().branch().id(),
542            ))
543        } else {
544            Cow::from(self.name())
545        }
546    }
547
548    pub fn version_vector(&self) -> VersionVector {
549        self.versions
550            .iter()
551            .fold(VersionVector::new(), |mut vv, dir| {
552                vv.merge(dir.version_vector());
553                vv
554            })
555    }
556
557    pub async fn open(&self) -> Result<JointDirectory> {
558        self.open_with(MissingVersionStrategy::Skip, DirectoryFallback::Enabled)
559            .await
560    }
561
562    pub(crate) async fn open_with(
563        &self,
564        missing_version_strategy: MissingVersionStrategy,
565        fallback: DirectoryFallback,
566    ) -> Result<JointDirectory> {
567        let mut versions = Vec::new();
568        for version in &self.versions {
569            match version.open(fallback).await {
570                Ok(open_dir) => versions.push(open_dir),
571                Err(e)
572                    if self
573                        .local_branch
574                        .map(|local_branch| version.branch().id() == local_branch.id())
575                        .unwrap_or(false) =>
576                {
577                    return Err(e)
578                }
579                Err(Error::Store(store::Error::BlockNotFound))
580                    if matches!(missing_version_strategy, MissingVersionStrategy::Skip) =>
581                {
582                    // Some of the directories on remote branches may fail due to them not yet
583                    // being fully downloaded from remote peers. This is OK and we'll treat such
584                    // cases as if this replica doesn't know about those directories.
585                    continue;
586                }
587                Err(e) => return Err(e),
588            }
589        }
590
591        Ok(JointDirectory::new(self.local_branch.cloned(), versions))
592    }
593
594    pub(crate) fn versions(&self) -> &[DirectoryRef<'_>] {
595        &self.versions
596    }
597
598    fn first_version(&self) -> &DirectoryRef<'a> {
599        self.versions
600            .first()
601            .expect("joint directory must contain at least one directory")
602    }
603}
604
605impl fmt::Debug for JointDirectoryRef<'_> {
606    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
607        f.debug_struct("JointDirectoryRef")
608            .field("name", &self.name())
609            .finish()
610    }
611}
612
613/// How to handle opening a joint directory that has some versions that are not fully loaded yet.
614#[derive(Copy, Clone)]
615pub enum MissingVersionStrategy {
616    /// Ignore the missing versions
617    Skip,
618    /// Fail the whole open operation
619    Fail,
620}
621
622// Iterator adaptor that maps iterator of `EntryRef` to iterator of `JointEntryRef` by filtering
623// out the outdated (according the their version vectors) versions and then merging all
624// `EntryRef::Directory` items into a single `JointDirectoryRef` item but keeping `EntryRef::File`
625// items separate.
626#[derive(Clone)]
627enum Merge<'a> {
628    // zero or more versions of an entry...
629    Existing(Existing<'a>),
630    // ...or a single tombstone
631    Tombstone(EntryTombstoneData),
632}
633
634#[derive(Default, Clone)]
635struct Existing<'a> {
636    // TODO: The most common case for files shall be that there will be only one version of it.
637    // Thus it might make sense to have one place holder for the first file to avoid Vec allocation
638    // when not needed.
639    files: VecDeque<FileRef<'a>>,
640    directories: Vec<DirectoryRef<'a>>,
641    needs_disambiguation: bool,
642    local_branch: Option<&'a Branch>,
643}
644
645impl<'a> Iterator for Existing<'a> {
646    type Item = JointEntryRef<'a>;
647
648    fn next(&mut self) -> Option<Self::Item> {
649        if let Some(dir) = JointDirectoryRef::new(
650            mem::take(&mut self.directories),
651            self.local_branch,
652            self.needs_disambiguation,
653        ) {
654            return Some(JointEntryRef::Directory(dir));
655        }
656
657        Some(JointEntryRef::File(JointFileRef {
658            file: self.files.pop_front()?,
659            needs_disambiguation: self.needs_disambiguation,
660        }))
661    }
662}
663
664impl<'a> Merge<'a> {
665    // All these entries are expected to have the same name. They can be either files, directories
666    // or a mix of the two.
667    fn new<I>(entries: I, local_branch: Option<&'a Branch>) -> Self
668    where
669        I: Iterator<Item = EntryRef<'a>>,
670    {
671        let mut files = VecDeque::new();
672        let mut directories = vec![];
673        let mut tombstone: Option<EntryTombstoneData> = None;
674
675        // Note that doing this will remove files that have been removed by tombstones as well.
676        let entries = versioned::keep_maximal(entries, PreferBranch(local_branch.map(Branch::id)));
677
678        for entry in entries {
679            match entry {
680                EntryRef::File(file) => files.push_back(file),
681                EntryRef::Directory(dir) => directories.push(dir),
682                EntryRef::Tombstone(_) if !files.is_empty() || !directories.is_empty() => continue,
683                EntryRef::Tombstone(new_tombstone) => {
684                    let new_tombstone = if let Some(mut old_tombstone) = tombstone.take() {
685                        old_tombstone.merge(new_tombstone.data());
686                        old_tombstone
687                    } else {
688                        new_tombstone.data().clone()
689                    };
690
691                    tombstone = Some(new_tombstone);
692                }
693            }
694        }
695
696        let needs_disambiguation = files.len() + if directories.is_empty() { 0 } else { 1 } > 1;
697
698        match tombstone {
699            Some(tombstone) if files.is_empty() && directories.is_empty() => {
700                Self::Tombstone(tombstone)
701            }
702            Some(_) | None => Self::Existing(Existing {
703                files,
704                directories,
705                needs_disambiguation,
706                local_branch,
707            }),
708        }
709    }
710
711    fn ignore_tombstones(self) -> Existing<'a> {
712        match self {
713            Self::Existing(existing) => existing,
714            Self::Tombstone(_) => Existing::default(),
715        }
716    }
717}
718
719enum Pattern<'a> {
720    // Fetch all entries
721    All,
722    // Fetch single entry that matches the given unique name
723    Unique(&'a str),
724}
725
726impl<'a> Pattern<'a> {
727    fn apply(&self, dir: &'a JointDirectory) -> Result<impl Iterator<Item = JointEntryRef<'a>>> {
728        match self {
729            Self::All => Ok(Either::Left(dir.entries())),
730            Self::Unique(name) => dir
731                .lookup_unique(name)
732                .map(|entry| Either::Right(iter::once(entry))),
733        }
734    }
735}