ouisync/directory/
mod.rs

1mod content;
2mod entry;
3mod entry_data;
4mod entry_type;
5mod parent_context;
6#[cfg(test)]
7mod tests;
8
9pub use self::{
10    content::VERSION as DIRECTORY_VERSION,
11    entry::{DirectoryRef, EntryRef, FileRef},
12    entry_type::EntryType,
13};
14pub(crate) use self::{
15    entry_data::{EntryData, EntryTombstoneData, TombstoneCause},
16    parent_context::ParentContext,
17};
18
19use self::content::Content;
20use crate::{
21    blob::{lock::ReadLock, Blob, BlobId},
22    branch::Branch,
23    crypto::sign::PublicKey,
24    debug::DebugPrinter,
25    error::{Error, Result},
26    file::File,
27    protocol::{Bump, Locator, RootNode, RootNodeFilter},
28    store::{self, Changeset, ReadTransaction, WriteTransaction},
29    version_vector::VersionVector,
30};
31use async_recursion::async_recursion;
32use std::{cmp::Ordering, fmt, mem};
33use tracing::instrument;
34
35#[derive(Clone)]
36pub struct Directory {
37    blob: Blob,
38    parent: Option<ParentContext>,
39    content: Content,
40    lock: Option<ReadLock>,
41}
42
43#[allow(clippy::len_without_is_empty)]
44impl Directory {
45    /// Opens the root directory.
46    pub(crate) async fn open_root(
47        branch: Branch,
48        locking: DirectoryLocking,
49        fallback: DirectoryFallback,
50    ) -> Result<Self> {
51        Self::open(branch, BlobId::ROOT, None, locking, fallback).await
52    }
53
54    /// Opens the root directory or creates it if it doesn't exists.
55    ///
56    /// See [`Self::create_directory`] for info about the `merge` parameter.
57    #[instrument(skip(branch), fields(branch_id = ?branch.id()))]
58    pub(crate) async fn open_or_create_root(branch: Branch, merge: VersionVector) -> Result<Self> {
59        let blob_id = BlobId::ROOT;
60
61        let lock = branch.locker().read(blob_id).await;
62        let mut tx = branch.store().begin_write().await?;
63        let mut changeset = Changeset::new();
64
65        let dir = match Self::open_in(
66            Some(lock.clone()),
67            &mut tx,
68            branch.clone(),
69            blob_id,
70            None,
71            DirectoryFallback::Disabled,
72        )
73        .await
74        {
75            Ok(dir) => Some(dir),
76            Err(Error::Store(store::Error::BranchNotFound)) => None,
77            Err(error) => return Err(error),
78        };
79
80        let dir = if let Some(mut dir) = dir {
81            if !merge.is_empty() {
82                let bump = Bump::Merge(merge);
83                changeset.force_bump(true);
84                dir.bump(&mut tx, &mut changeset, bump).await?;
85                dir.commit(tx, changeset).await?;
86            }
87
88            dir
89        } else {
90            let bump = if merge.is_empty() {
91                Bump::increment(*branch.id())
92            } else {
93                Bump::Merge(merge)
94            };
95
96            let mut dir = Self::create(lock, branch, blob_id, None);
97            dir.save(&mut tx, &mut changeset, &Content::empty()).await?;
98            dir.bump(&mut tx, &mut changeset, bump).await?;
99            dir.commit(tx, changeset).await?;
100            dir
101        };
102
103        Ok(dir)
104    }
105
106    /// Reloads this directory from the db.
107    pub(crate) async fn refresh(&mut self) -> Result<()> {
108        let mut tx = self.branch().store().begin_read().await?;
109        self.refresh_in(&mut tx).await
110    }
111
112    /// Lookup an entry of this directory by name.
113    pub fn lookup(&self, name: &'_ str) -> Result<EntryRef> {
114        self.content
115            .get_key_value(name)
116            .map(|(name, data)| EntryRef::new(self, name, data))
117            .ok_or(Error::EntryNotFound)
118    }
119
120    /// Returns iterator over the entries of this directory.
121    pub fn entries(&self) -> impl DoubleEndedIterator<Item = EntryRef> + Clone {
122        self.content
123            .iter()
124            .map(move |(name, data)| EntryRef::new(self, name, data))
125    }
126
127    /// Creates a new file inside this directory.
128    pub async fn create_file(&mut self, name: String) -> Result<File> {
129        let mut tx = self.branch().store().begin_write().await?;
130        let mut changeset = Changeset::new();
131
132        self.refresh_in(&mut tx).await?;
133
134        let blob_id = rand::random();
135        let version_vector = self
136            .content
137            .initial_version_vector(&name)
138            .incremented(*self.branch().id());
139        let data = EntryData::file(blob_id, version_vector);
140        let parent = self.create_parent_context(name.clone());
141
142        let mut file = File::create(self.branch().clone(), Locator::head(blob_id), parent);
143        let mut content = self.content.clone();
144
145        let diff = content.insert(name, data)?;
146
147        file.save(&mut tx, &mut changeset).await?;
148        self.save(&mut tx, &mut changeset, &content).await?;
149        self.bump(&mut tx, &mut changeset, Bump::Add(diff)).await?;
150        self.commit(tx, changeset).await?;
151        self.finalize(content);
152
153        Ok(file)
154    }
155
156    /// Creates a new subdirectory of this directory.
157    ///
158    /// `blob_id` is the blob id of the directory to be created. It must be unique. The easiest way
159    /// to achieve it is to generate it randomly (it has enough bits for collisions to be
160    /// astronomically unlikely).
161    ///
162    /// The version vector of the created subdirectory depends on the `merge` parameter:
163    ///
164    /// - if it is empty, it's `old_vv` with the local version incremented,
165    /// - if it is non-empty, it's the `old_vv` merged with `merge`,
166    ///
167    /// where `old_vv` is the version vector of the existing entry or `VersionVector::new()` if not
168    /// such exists yet.
169    #[instrument(level = "trace", skip(self))]
170    pub(crate) async fn create_directory(
171        &mut self,
172        name: String,
173        blob_id: BlobId,
174        merge: &VersionVector,
175    ) -> Result<Self> {
176        let lock = self
177            .branch()
178            .locker()
179            .try_read(blob_id)
180            .map_err(|_| Error::EntryExists)?;
181
182        let mut tx = self.branch().store().begin_write().await?;
183        let mut changeset = Changeset::new();
184
185        self.refresh_in(&mut tx).await?;
186
187        let (dir, content) = self
188            .create_directory_in(lock, &mut tx, &mut changeset, name, blob_id, merge)
189            .await?;
190
191        self.commit(tx, changeset).await?;
192        self.finalize(content);
193
194        Ok(dir)
195    }
196
197    async fn create_directory_in(
198        &mut self,
199        lock: ReadLock,
200        tx: &mut WriteTransaction,
201        changeset: &mut Changeset,
202        name: String,
203        blob_id: BlobId,
204        merge: &VersionVector,
205    ) -> Result<(Self, Content)> {
206        let mut version_vector = self.content.initial_version_vector(&name);
207
208        if merge.is_empty() {
209            version_vector.increment(*self.branch().id())
210        } else {
211            version_vector.merge(merge)
212        }
213
214        let data = EntryData::directory(blob_id, version_vector);
215        let parent = self.create_parent_context(name.clone());
216
217        let mut dir = Directory::create(lock, self.branch().clone(), blob_id, Some(parent));
218        let mut content = self.content.clone();
219
220        let diff = content.insert(name, data)?;
221
222        dir.save(tx, changeset, &Content::empty()).await?;
223        self.save(tx, changeset, &content).await?;
224        self.bump(tx, changeset, Bump::Add(diff)).await?;
225
226        Ok((dir, content))
227    }
228
229    fn create_parent_context(&self, entry_name: String) -> ParentContext {
230        ParentContext::new(
231            *self.blob_id(),
232            self.lock.clone(),
233            entry_name,
234            self.parent.clone(),
235        )
236    }
237
238    /// Removes a file or subdirectory from this directory. If the entry to be removed is a
239    /// directory, it needs to be empty or a `DirectoryNotEmpty` error is returned.
240    ///
241    /// `branch_id` and `version_vector` specify the entry to be removed. This makes it possible to
242    /// remove entries from remote branches as well.
243    ///
244    /// In most cases the removal is implemented by inserting a "tombstone" in place of the entry.
245    /// One exception is when removing a remote entry which also has a concurrent local version. In
246    /// that case the version vector of the local entry is bumped to be happens-after the one to be
247    /// removed. This effectively removes the remote entry while keeping the local one.
248    #[instrument(skip(self))]
249    pub(crate) async fn remove_entry(
250        &mut self,
251        name: &str,
252        branch_id: &PublicKey,
253        version_vector: VersionVector,
254    ) -> Result<()> {
255        let mut tx = self.branch().store().begin_write().await?;
256        let mut changeset = Changeset::new();
257
258        // If we are removing a directory, ensure it's empty (recursive removal can still be
259        // implemented at the upper layers).
260        self.check_directory_empty(&mut tx, name).await?;
261
262        let content = self
263            .begin_remove_entry(
264                &mut tx,
265                &mut changeset,
266                name,
267                branch_id,
268                version_vector,
269                TombstoneCause::Removed,
270            )
271            .await?;
272
273        self.commit(tx, changeset).await?;
274        self.finalize(content);
275
276        Ok(())
277    }
278
279    /// Creates a tombstone for entry with the given name. If the entry exists, this effectively
280    /// removes it. If it doesn't exist, it still creates the tombstone. This method is meant to be
281    /// used for merging removed entries from other branches. For removing entries locally, use
282    /// [`Self::remove_entry`] instead.
283    #[instrument()]
284    pub(crate) async fn create_tombstone(
285        &mut self,
286        name: &str,
287        tombstone: EntryTombstoneData,
288    ) -> Result<()> {
289        let mut tx = self.branch().store().begin_write().await?;
290        let mut changeset = Changeset::new();
291
292        match self.lookup(name) {
293            Ok(EntryRef::File(_) | EntryRef::Directory(_)) | Err(Error::EntryNotFound) => (),
294            Ok(EntryRef::Tombstone(old_entry)) => {
295                // Attempt to replace a tombstone with another tombstone whose version vector is
296                // the same or lower is a no-op.
297                if &tombstone.version_vector <= old_entry.version_vector() {
298                    return Ok(());
299                }
300            }
301            Err(e) => return Err(e),
302        }
303
304        let content = self
305            .begin_insert_entry(
306                &mut tx,
307                &mut changeset,
308                name.to_owned(),
309                EntryData::Tombstone(tombstone),
310            )
311            .await?;
312
313        self.commit(tx, changeset).await?;
314        self.finalize(content);
315
316        Ok(())
317    }
318
319    /// Moves an entry at `src_name` from this directory to the `dst_dir` directory at `dst_name`.
320    ///
321    /// It adds a tombstone to where the entry is being moved from and creates a new entry at the
322    /// destination.
323    ///
324    /// Note on why we're passing the `src_data` to the function instead of just looking it up
325    /// using `src_name`: it's because the version vector inside of the `src_data` is expected to
326    /// be the one the caller of this function is trying to move. It could, in theory, happen that
327    /// the source entry has been modified between when the caller obtained the entry and when we
328    /// would do the lookup.
329    ///
330    /// Thus using the "caller provided" version vector, we ensure that we don't accidentally
331    /// delete data.
332    ///
333    /// To move an entry within the same directory, clone `self` and pass it as `dst_dir`.
334    ///
335    /// # Cancel safety
336    ///
337    /// This function is atomic and thus cancel safe. Either the entry is both removed from the src
338    /// directory and inserted into the dst directory or, in case of error or cancellation, none of
339    /// those operations happen.
340    pub(crate) async fn move_entry(
341        &mut self,
342        src_name: &str,
343        src_data: EntryData,
344        dst_dir: &mut Directory,
345        dst_name: &str,
346        dst_vv: VersionVector,
347    ) -> Result<()> {
348        let mut dst_data = src_data;
349        let src_vv = mem::replace(dst_data.version_vector_mut(), dst_vv);
350
351        let mut tx = self.branch().store().begin_write().await?;
352
353        let mut changeset = Changeset::new();
354        let dst_content = dst_dir
355            .begin_insert_entry(&mut tx, &mut changeset, dst_name.to_owned(), dst_data)
356            .await?;
357
358        // TODO: Handle the case when `self` == `dst_dir` separately (call `refresh` and `save`
359        // only once) to avoid having to apply the changeset here.
360        changeset
361            .apply(
362                &mut tx,
363                self.branch().id(),
364                self.branch()
365                    .keys()
366                    .write()
367                    .ok_or(Error::PermissionDenied)?,
368            )
369            .await?;
370
371        let branch_id = *self.branch().id();
372        let mut changeset = Changeset::new();
373        let src_content = self
374            .begin_remove_entry(
375                &mut tx,
376                &mut changeset,
377                src_name,
378                &branch_id,
379                src_vv,
380                TombstoneCause::Moved,
381            )
382            .await?;
383
384        self.commit(tx, changeset).await?;
385        self.finalize(src_content);
386        dst_dir.finalize(dst_content);
387
388        Ok(())
389    }
390
391    /// Forks this directory (but not its content) into `dst_branch`. This effectively creates an
392    /// empty directory in `dst_branch` at the same path as `self`. If the dst directory already
393    /// exists, it only updates it's version vector and possibly blob_id.
394    ///
395    /// Note this implicitly forks all the ancestor directories first.
396    #[instrument(name = "fork directory", skip_all)]
397    #[async_recursion]
398    pub(crate) async fn fork(&self, dst_branch: &Branch) -> Result<Directory> {
399        if dst_branch.id() == self.branch().id() {
400            return Ok(self.clone());
401        }
402
403        // Because we are transferring only the directory but not its content, we reflect that
404        // by setting its version vector to what the version vector of the source directory was
405        // at the time it was initially created.
406        let (parent, current_vv, initial_vv) = self.prepare_fork().await?;
407
408        if let Some((parent_dir, entry_name)) = parent {
409            let mut parent_dir = parent_dir.fork(dst_branch).await?;
410            let blob_id = *self.blob_id();
411
412            parent_dir
413                .fork_into(entry_name, blob_id, current_vv, initial_vv)
414                .await
415        } else {
416            Self::open_or_create_root(dst_branch.clone(), initial_vv).await
417        }
418    }
419
420    /// Prepares information needed to fork this directory.
421    ///
422    /// Returns the parent directory and entry name (unless root) and the current and initial
423    /// version vectors of this directory (initial version vector is the version vector this
424    /// directory had when it was initially created).
425    async fn prepare_fork(&self) -> Result<(Option<(Self, &str)>, VersionVector, VersionVector)> {
426        // Running this in a read transaction to make sure the version vector of this directory
427        // and the version vectors of its entries are in sync.
428        let mut tx = self.branch().store().begin_read().await?;
429
430        let (parent, current_vv) = if let Some(parent) = &self.parent {
431            let parent_dir = parent.open_in(&mut tx, self.branch().clone()).await?;
432            let entry_name = parent.entry_name();
433            let current_vv = parent_dir.lookup(entry_name)?.version_vector().clone();
434
435            (Some((parent_dir, entry_name)), current_vv)
436        } else {
437            let current_vv = tx
438                .load_latest_approved_root_node(self.branch().id(), RootNodeFilter::Any)
439                .await?
440                .proof
441                .into_version_vector();
442
443            (None, current_vv)
444        };
445
446        let (_, content) = self.load(&mut tx, DirectoryFallback::Disabled).await?;
447
448        let entries_vv = content
449            .iter()
450            .map(|(_, entry)| entry.version_vector())
451            .sum();
452        let initial_vv = current_vv.saturating_sub(&entries_vv);
453
454        Ok((parent, current_vv, initial_vv))
455    }
456
457    /// Forks a directory from a remote branch into the subdirectory at `name` in this directory.
458    async fn fork_into(
459        &mut self,
460        name: &str,
461        src_blob_id: BlobId,
462        src_current_vv: VersionVector,
463        src_initial_vv: VersionVector,
464    ) -> Result<Self> {
465        let new_lock = self.branch().locker().read(src_blob_id).await;
466        let (mut tx, old_lock, old_vv) = self.begin_fork(name).await?;
467        let mut changeset = Changeset::new();
468
469        let (dir, content) = if let Some(old_lock) = old_lock {
470            // Select which blob_id to use for the forked directory.
471            let new_lock = match src_current_vv.partial_cmp(&old_vv) {
472                Some(Ordering::Greater) => new_lock,
473                Some(Ordering::Less) => old_lock.clone(),
474                Some(Ordering::Equal) | None => {
475                    // Break ties by arbitrarily taking the greater on. This assures that every
476                    // replica picks the same blob_id.
477                    if new_lock.blob_id() > old_lock.blob_id() {
478                        new_lock
479                    } else {
480                        old_lock.clone()
481                    }
482                }
483            };
484
485            self.fork_update(
486                &mut tx,
487                &mut changeset,
488                name,
489                old_lock,
490                new_lock,
491                src_initial_vv,
492            )
493            .await?
494        } else {
495            self.create_directory_in(
496                new_lock,
497                &mut tx,
498                &mut changeset,
499                name.to_owned(),
500                src_blob_id,
501                &src_initial_vv,
502            )
503            .await?
504        };
505
506        self.commit(tx, changeset).await?;
507        self.finalize(content);
508
509        Ok(dir)
510    }
511
512    /// Begins the forking operation into the subdirectory at `name`.
513    /// Returns the write transaction, the read lock for the currently existing directory at `name`
514    /// (if any) and its current version vector.
515    async fn begin_fork(
516        &mut self,
517        name: &str,
518    ) -> Result<(WriteTransaction, Option<ReadLock>, VersionVector)> {
519        let mut old_blob_id = match self.lookup(name) {
520            Ok(EntryRef::Directory(entry)) => Some(*entry.blob_id()),
521            Ok(EntryRef::File(_) | EntryRef::Tombstone(_)) | Err(Error::EntryNotFound) => None,
522            Err(error) => return Err(error),
523        };
524
525        loop {
526            let old_lock = if let Some(old_blob_id) = old_blob_id {
527                Some(self.branch().locker().read(old_blob_id).await)
528            } else {
529                None
530            };
531
532            let mut tx = self.branch().store().begin_write().await?;
533
534            self.refresh_in(&mut tx).await?;
535
536            match (self.lookup(name), old_lock) {
537                (Ok(EntryRef::Directory(entry)), Some(old_lock))
538                    if entry.blob_id() == old_lock.blob_id() =>
539                {
540                    return Ok((tx, Some(old_lock), entry.version_vector().clone()));
541                }
542                (Ok(EntryRef::Directory(entry)), _) => {
543                    old_blob_id = Some(*entry.blob_id());
544                    continue;
545                }
546                (Ok(EntryRef::Tombstone(_)) | Err(Error::EntryNotFound), None) => {
547                    return Ok((tx, None, VersionVector::new()))
548                }
549                (Ok(EntryRef::Tombstone(_)) | Err(Error::EntryNotFound), Some(_)) => {
550                    old_blob_id = None;
551                    continue;
552                }
553                (Ok(EntryRef::File(_)), _) => return Err(Error::EntryIsFile),
554                (Err(error), _) => return Err(error),
555            }
556        }
557    }
558
559    /// Forks into an existing subdirectory.
560    ///
561    /// # Panics
562    ///
563    /// Panics if the entry at `name` doesn't exist or is not a directory.
564    async fn fork_update(
565        &mut self,
566        tx: &mut WriteTransaction,
567        changeset: &mut Changeset,
568        name: &str,
569        old_lock: ReadLock,
570        new_lock: ReadLock,
571        initial_vv: VersionVector,
572    ) -> Result<(Self, Content)> {
573        let old_blob_id = *old_lock.blob_id();
574        let new_blob_id = *new_lock.blob_id();
575
576        let parent_context = self.create_parent_context(name.to_owned());
577
578        let mut dir = Self::open_in(
579            Some(old_lock),
580            tx,
581            self.branch().clone(),
582            old_blob_id,
583            Some(parent_context),
584            DirectoryFallback::Disabled,
585        )
586        .await?;
587
588        let mut self_content = self.content.clone();
589
590        let entry = match self_content.get_mut(name) {
591            Some(EntryData::Directory(entry)) => entry,
592            Some(EntryData::File(_) | EntryData::Tombstone(_)) | None => unreachable!(),
593        };
594
595        let bump = Bump::Merge(initial_vv);
596        let diff = bump.apply(&mut entry.version_vector);
597
598        // Change the blob id
599        if new_blob_id != entry.blob_id {
600            // Replace and remove the old blob.
601            //
602            // Note the removal is not strictly necessary because the old blob would be eventually
603            // removed by the garbage collector, but doing it this way improves merge performance:
604            //
605            // This way, when merging two branches, A and B, where A is happens-after B, both
606            // branches become identical (same vv, same hash) right after the merge. If we
607            // delegated the removal to the gc, they would not be immediatelly identical - because
608            // the orphaned blob would exist in one but not the other - and would only become so
609            // after the gc run. This avoids some unnecessary mesage exchanges during syncing.
610            mem::replace(
611                &mut dir.blob,
612                Blob::create(self.branch().clone(), new_blob_id),
613            )
614            .remove(changeset);
615
616            dir.lock = Some(new_lock);
617
618            let dir_content = mem::replace(&mut dir.content, Content::empty());
619            dir.save(tx, changeset, &dir_content).await?;
620
621            entry.blob_id = new_blob_id;
622        }
623
624        self.save(tx, changeset, &self_content).await?;
625        self.bump(tx, changeset, Bump::Add(diff)).await?;
626
627        Ok((dir, self_content))
628    }
629
630    pub(crate) async fn parent(&self) -> Result<Option<Directory>> {
631        if let Some(parent) = &self.parent {
632            Ok(Some(parent.open(self.branch().clone()).await?))
633        } else {
634            Ok(None)
635        }
636    }
637
638    pub(crate) fn is_root(&self) -> bool {
639        self.parent.is_none()
640    }
641
642    async fn open_in(
643        lock: Option<ReadLock>,
644        tx: &mut ReadTransaction,
645        branch: Branch,
646        blob_id: BlobId,
647        parent: Option<ParentContext>,
648        fallback: DirectoryFallback,
649    ) -> Result<Self> {
650        let (blob, content) = load(tx, branch, blob_id, fallback).await?;
651
652        Ok(Self {
653            blob,
654            parent,
655            content,
656            lock,
657        })
658    }
659
660    async fn open_snapshot(
661        tx: &mut ReadTransaction,
662        branch: Branch,
663        locator: Locator,
664        fallback: DirectoryFallback,
665    ) -> Result<Content> {
666        let (_, content) = load(tx, branch, *locator.blob_id(), fallback).await?;
667        Ok(content)
668    }
669
670    async fn open(
671        branch: Branch,
672        blob_id: BlobId,
673        parent: Option<ParentContext>,
674        locking: DirectoryLocking,
675        fallback: DirectoryFallback,
676    ) -> Result<Self> {
677        let lock = match locking {
678            DirectoryLocking::Enabled => Some(branch.locker().read(blob_id).await),
679            DirectoryLocking::Disabled => None,
680        };
681
682        let mut tx = branch.store().begin_read().await?;
683        Self::open_in(lock, &mut tx, branch, blob_id, parent, fallback).await
684    }
685
686    fn create(
687        lock: ReadLock,
688        branch: Branch,
689        blob_id: BlobId,
690        parent: Option<ParentContext>,
691    ) -> Self {
692        let blob = Blob::create(branch, blob_id);
693
694        Directory {
695            blob,
696            parent,
697            content: Content::empty(),
698            lock: Some(lock),
699        }
700    }
701
702    #[async_recursion]
703    pub async fn debug_print(&self, print: DebugPrinter) {
704        for (name, entry_data) in &self.content {
705            print.display(&format_args!("{:?}: {:?}", name, entry_data));
706
707            match entry_data {
708                EntryData::File(file_data) => {
709                    let print = print.indent();
710
711                    let parent_context = self.create_parent_context(name.into());
712                    let file = File::open(
713                        self.blob.branch().clone(),
714                        Locator::head(file_data.blob_id),
715                        parent_context,
716                    )
717                    .await;
718
719                    match file {
720                        Ok(mut file) => {
721                            let mut buf = [0; 32];
722                            let lenght_result = file.read(&mut buf).await;
723                            match lenght_result {
724                                Ok(length) => {
725                                    let file_len = file.len();
726                                    let ellipsis = if file_len > length as u64 { ".." } else { "" };
727                                    print.display(&format!(
728                                        "Content: {:?}{}",
729                                        std::str::from_utf8(&buf[..length]),
730                                        ellipsis
731                                    ));
732                                }
733                                Err(e) => {
734                                    print.display(&format!("Failed to read {:?}", e));
735                                }
736                            }
737                        }
738                        Err(e) => {
739                            print.display(&format!("Failed to open {:?}", e));
740                        }
741                    }
742                }
743                EntryData::Directory(data) => {
744                    let print = print.indent();
745
746                    let parent_context = self.create_parent_context(name.into());
747                    let dir = Directory::open(
748                        self.blob.branch().clone(),
749                        data.blob_id,
750                        Some(parent_context),
751                        DirectoryLocking::Disabled,
752                        DirectoryFallback::Disabled,
753                    )
754                    .await;
755
756                    match dir {
757                        Ok(dir) => {
758                            dir.debug_print(print).await;
759                        }
760                        Err(e) => {
761                            print.display(&format!("Failed to open {:?}", e));
762                        }
763                    }
764                }
765                EntryData::Tombstone(_) => {}
766            }
767        }
768    }
769
770    /// Branch of this directory
771    pub fn branch(&self) -> &Branch {
772        self.blob.branch()
773    }
774
775    /// Blob id of this directory
776    pub(crate) fn blob_id(&self) -> &BlobId {
777        self.blob.id()
778    }
779
780    /// Length of this directory in bytes. Does not include the content, only the size of directory
781    /// itself.
782    pub fn len(&self) -> u64 {
783        self.blob.len()
784    }
785
786    pub(crate) async fn version_vector(&self) -> Result<VersionVector> {
787        if let Some(parent) = &self.parent {
788            parent.entry_version_vector(self.branch().clone()).await
789        } else {
790            self.branch().version_vector().await
791        }
792    }
793
794    async fn begin_remove_entry(
795        &mut self,
796        tx: &mut ReadTransaction,
797        changeset: &mut Changeset,
798        name: &str,
799        branch_id: &PublicKey,
800        version_vector: VersionVector,
801        cause: TombstoneCause,
802    ) -> Result<Content> {
803        let mut new_data = match self.lookup(name) {
804            Ok(old_entry)
805                if branch_id != self.branch().id()
806                    && old_entry
807                        .version_vector()
808                        .partial_cmp(&version_vector)
809                        .is_none() =>
810            {
811                let mut new_data = old_entry.clone_data();
812                new_data.version_vector_mut().merge(&version_vector);
813                new_data
814            }
815            Ok(_) | Err(Error::EntryNotFound) => {
816                EntryData::Tombstone(EntryTombstoneData::new(cause, version_vector))
817            }
818            Err(error) => return Err(error),
819        };
820
821        new_data.version_vector_mut().increment(*self.branch().id());
822
823        self.begin_insert_entry(tx, changeset, name.to_owned(), new_data)
824            .await
825    }
826
827    async fn check_directory_empty(&self, tx: &mut ReadTransaction, name: &str) -> Result<()> {
828        match self.lookup(name) {
829            Ok(EntryRef::Directory(entry)) => {
830                if entry
831                    .open_snapshot(tx, DirectoryFallback::Disabled)
832                    .await?
833                    .iter()
834                    .all(|(_, data)| matches!(data, EntryData::Tombstone(_)))
835                {
836                    Ok(())
837                } else {
838                    Err(Error::DirectoryNotEmpty)
839                }
840            }
841            Ok(_) | Err(Error::EntryNotFound) => Ok(()),
842            Err(error) => Err(error),
843        }
844    }
845
846    async fn begin_insert_entry(
847        &mut self,
848        tx: &mut ReadTransaction,
849        changeset: &mut Changeset,
850        name: String,
851        data: EntryData,
852    ) -> Result<Content> {
853        self.refresh_in(tx).await?;
854
855        let mut content = self.content.clone();
856        let diff = content.insert(name, data)?;
857        self.save(tx, changeset, &content).await?;
858        self.bump(tx, changeset, Bump::Add(diff)).await?;
859
860        Ok(content)
861    }
862
863    async fn refresh_in(&mut self, tx: &mut ReadTransaction) -> Result<()> {
864        if self.blob.is_dirty() {
865            Ok(())
866        } else {
867            let (blob, content) = self.load(tx, DirectoryFallback::Disabled).await?;
868            self.blob = blob;
869            self.content = content;
870
871            Ok(())
872        }
873    }
874
875    async fn load(
876        &self,
877        tx: &mut ReadTransaction,
878        fallback: DirectoryFallback,
879    ) -> Result<(Blob, Content)> {
880        load(tx, self.branch().clone(), *self.blob_id(), fallback).await
881    }
882
883    async fn save(
884        &mut self,
885        tx: &mut ReadTransaction,
886        changeset: &mut Changeset,
887        content: &Content,
888    ) -> Result<()> {
889        // Save the directory content into the store
890        let buffer = content.serialize();
891        self.blob.truncate(0)?;
892        self.blob.write_all(tx, changeset, &buffer).await?;
893        self.blob.flush(tx, changeset).await?;
894
895        Ok(())
896    }
897
898    /// Atomically commits the transaction and sends notification event.
899    async fn commit(&mut self, tx: WriteTransaction, changeset: Changeset) -> Result<()> {
900        commit(tx, changeset, self.branch()).await
901    }
902
903    /// Updates the version vectors of this directory and all its ancestors.
904    #[async_recursion]
905    async fn bump(
906        &mut self,
907        tx: &mut ReadTransaction,
908        changeset: &mut Changeset,
909        bump: Bump,
910    ) -> Result<()> {
911        // Update the version vector of this directory and all it's ancestors
912        if let Some(parent) = self.parent.as_mut() {
913            parent
914                .bump(tx, changeset, self.blob.branch().clone(), bump)
915                .await
916        } else {
917            changeset.bump(bump);
918            Ok(())
919        }
920    }
921
922    /// Finalize pending modifications. Call this only after the db transaction has been committed.
923    fn finalize(&mut self, content: Content) {
924        self.content = content;
925    }
926}
927
928impl fmt::Debug for Directory {
929    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
930        f.debug_struct("Directory")
931            .field(
932                "name",
933                &self
934                    .parent
935                    .as_ref()
936                    .map(|parent| parent.entry_name())
937                    .unwrap_or("/"),
938            )
939            .field("branch", self.branch().id())
940            .field("blob_id", self.blob_id())
941            .finish()
942    }
943}
944
945/// Enable/disable fallback to previous snapshots in case of missing blocks.
946#[derive(Clone, Copy)]
947pub(crate) enum DirectoryFallback {
948    Enabled,
949    Disabled,
950}
951
952/// Enable/disable acquiring read lock before creating/opening the directory.
953#[derive(Clone, Copy)]
954pub(crate) enum DirectoryLocking {
955    Enabled,
956    Disabled,
957}
958
959/// Update the root version vector of the given branch by merging it with `merge`.
960/// If `merge` is less that or equal to the current root version vector, this is s no-op.
961#[instrument(skip(branch), fields(writer_id = ?branch.id()))]
962pub(crate) async fn bump_root(branch: &Branch, merge: VersionVector) -> Result<()> {
963    let tx = branch.store().begin_write().await?;
964    let mut changeset = Changeset::new();
965    changeset.force_bump(true);
966    changeset.bump(Bump::Merge(merge));
967    commit(tx, changeset, branch).await
968}
969
970// Load directory content. On missing block, fallback to previous snapshot (if any).
971async fn load(
972    tx: &mut ReadTransaction,
973    branch: Branch,
974    blob_id: BlobId,
975    fallback: DirectoryFallback,
976) -> Result<(Blob, Content)> {
977    let mut root_node = tx
978        .load_latest_approved_root_node(branch.id(), RootNodeFilter::Any)
979        .await?;
980
981    loop {
982        let error = match load_at(tx, &root_node, branch.clone(), blob_id).await {
983            Ok((blob, content)) => return Ok((blob, content)),
984            Err(error @ Error::Store(store::Error::BlockNotFound)) => error,
985            Err(error) => return Err(error),
986        };
987
988        match fallback {
989            DirectoryFallback::Enabled => (),
990            DirectoryFallback::Disabled => return Err(error),
991        }
992
993        if let Some(prev) = tx.load_prev_approved_root_node(&root_node).await? {
994            root_node = prev;
995        } else {
996            return Err(error);
997        }
998    }
999}
1000
1001async fn load_at(
1002    tx: &mut ReadTransaction,
1003    root_node: &RootNode,
1004    branch: Branch,
1005    blob_id: BlobId,
1006) -> Result<(Blob, Content)> {
1007    let mut blob = Blob::open_at(tx, root_node, branch, blob_id).await?;
1008    let buffer = blob.read_to_end_at(tx, root_node).await?;
1009    let content = Content::deserialize(&buffer)?;
1010
1011    Ok((blob, content))
1012}
1013
1014/// Apply the changeset, commit the transaction and send a notification event.
1015async fn commit(mut tx: WriteTransaction, changeset: Changeset, branch: &Branch) -> Result<()> {
1016    let changed = changeset
1017        .apply(
1018            &mut tx,
1019            branch.id(),
1020            branch.keys().write().ok_or(Error::PermissionDenied)?,
1021        )
1022        .await?;
1023
1024    if !changed {
1025        return Ok(());
1026    }
1027
1028    let event_tx = branch.notify();
1029    tx.commit_and_then(move || event_tx.send()).await?;
1030
1031    Ok(())
1032}