1mod content;
2mod entry;
3mod entry_data;
4mod entry_type;
5mod parent_context;
6#[cfg(test)]
7mod tests;
8
9pub use self::{
10 content::VERSION as DIRECTORY_VERSION,
11 entry::{DirectoryRef, EntryRef, FileRef},
12 entry_type::EntryType,
13};
14pub(crate) use self::{
15 entry_data::{EntryData, EntryTombstoneData, TombstoneCause},
16 parent_context::ParentContext,
17};
18
19use self::content::Content;
20use crate::{
21 blob::{lock::ReadLock, Blob, BlobId},
22 branch::Branch,
23 crypto::sign::PublicKey,
24 debug::DebugPrinter,
25 error::{Error, Result},
26 file::File,
27 protocol::{Bump, Locator, RootNode, RootNodeFilter},
28 store::{self, Changeset, ReadTransaction, WriteTransaction},
29 version_vector::VersionVector,
30};
31use async_recursion::async_recursion;
32use std::{cmp::Ordering, fmt, mem};
33use tracing::instrument;
34
35#[derive(Clone)]
36pub struct Directory {
37 blob: Blob,
38 parent: Option<ParentContext>,
39 content: Content,
40 lock: Option<ReadLock>,
41}
42
43#[allow(clippy::len_without_is_empty)]
44impl Directory {
45 pub(crate) async fn open_root(
47 branch: Branch,
48 locking: DirectoryLocking,
49 fallback: DirectoryFallback,
50 ) -> Result<Self> {
51 Self::open(branch, BlobId::ROOT, None, locking, fallback).await
52 }
53
54 #[instrument(skip(branch), fields(branch_id = ?branch.id()))]
58 pub(crate) async fn open_or_create_root(branch: Branch, merge: VersionVector) -> Result<Self> {
59 let blob_id = BlobId::ROOT;
60
61 let lock = branch.locker().read(blob_id).await;
62 let mut tx = branch.store().begin_write().await?;
63 let mut changeset = Changeset::new();
64
65 let dir = match Self::open_in(
66 Some(lock.clone()),
67 &mut tx,
68 branch.clone(),
69 blob_id,
70 None,
71 DirectoryFallback::Disabled,
72 )
73 .await
74 {
75 Ok(dir) => Some(dir),
76 Err(Error::Store(store::Error::BranchNotFound)) => None,
77 Err(error) => return Err(error),
78 };
79
80 let dir = if let Some(mut dir) = dir {
81 if !merge.is_empty() {
82 let bump = Bump::Merge(merge);
83 changeset.force_bump(true);
84 dir.bump(&mut tx, &mut changeset, bump).await?;
85 dir.commit(tx, changeset).await?;
86 }
87
88 dir
89 } else {
90 let bump = if merge.is_empty() {
91 Bump::increment(*branch.id())
92 } else {
93 Bump::Merge(merge)
94 };
95
96 let mut dir = Self::create(lock, branch, blob_id, None);
97 dir.save(&mut tx, &mut changeset, &Content::empty()).await?;
98 dir.bump(&mut tx, &mut changeset, bump).await?;
99 dir.commit(tx, changeset).await?;
100 dir
101 };
102
103 Ok(dir)
104 }
105
106 pub(crate) async fn refresh(&mut self) -> Result<()> {
108 let mut tx = self.branch().store().begin_read().await?;
109 self.refresh_in(&mut tx).await
110 }
111
112 pub fn lookup(&self, name: &'_ str) -> Result<EntryRef> {
114 self.content
115 .get_key_value(name)
116 .map(|(name, data)| EntryRef::new(self, name, data))
117 .ok_or(Error::EntryNotFound)
118 }
119
120 pub fn entries(&self) -> impl DoubleEndedIterator<Item = EntryRef> + Clone {
122 self.content
123 .iter()
124 .map(move |(name, data)| EntryRef::new(self, name, data))
125 }
126
127 pub async fn create_file(&mut self, name: String) -> Result<File> {
129 let mut tx = self.branch().store().begin_write().await?;
130 let mut changeset = Changeset::new();
131
132 self.refresh_in(&mut tx).await?;
133
134 let blob_id = rand::random();
135 let version_vector = self
136 .content
137 .initial_version_vector(&name)
138 .incremented(*self.branch().id());
139 let data = EntryData::file(blob_id, version_vector);
140 let parent = self.create_parent_context(name.clone());
141
142 let mut file = File::create(self.branch().clone(), Locator::head(blob_id), parent);
143 let mut content = self.content.clone();
144
145 let diff = content.insert(name, data)?;
146
147 file.save(&mut tx, &mut changeset).await?;
148 self.save(&mut tx, &mut changeset, &content).await?;
149 self.bump(&mut tx, &mut changeset, Bump::Add(diff)).await?;
150 self.commit(tx, changeset).await?;
151 self.finalize(content);
152
153 Ok(file)
154 }
155
156 #[instrument(level = "trace", skip(self))]
170 pub(crate) async fn create_directory(
171 &mut self,
172 name: String,
173 blob_id: BlobId,
174 merge: &VersionVector,
175 ) -> Result<Self> {
176 let lock = self
177 .branch()
178 .locker()
179 .try_read(blob_id)
180 .map_err(|_| Error::EntryExists)?;
181
182 let mut tx = self.branch().store().begin_write().await?;
183 let mut changeset = Changeset::new();
184
185 self.refresh_in(&mut tx).await?;
186
187 let (dir, content) = self
188 .create_directory_in(lock, &mut tx, &mut changeset, name, blob_id, merge)
189 .await?;
190
191 self.commit(tx, changeset).await?;
192 self.finalize(content);
193
194 Ok(dir)
195 }
196
197 async fn create_directory_in(
198 &mut self,
199 lock: ReadLock,
200 tx: &mut WriteTransaction,
201 changeset: &mut Changeset,
202 name: String,
203 blob_id: BlobId,
204 merge: &VersionVector,
205 ) -> Result<(Self, Content)> {
206 let mut version_vector = self.content.initial_version_vector(&name);
207
208 if merge.is_empty() {
209 version_vector.increment(*self.branch().id())
210 } else {
211 version_vector.merge(merge)
212 }
213
214 let data = EntryData::directory(blob_id, version_vector);
215 let parent = self.create_parent_context(name.clone());
216
217 let mut dir = Directory::create(lock, self.branch().clone(), blob_id, Some(parent));
218 let mut content = self.content.clone();
219
220 let diff = content.insert(name, data)?;
221
222 dir.save(tx, changeset, &Content::empty()).await?;
223 self.save(tx, changeset, &content).await?;
224 self.bump(tx, changeset, Bump::Add(diff)).await?;
225
226 Ok((dir, content))
227 }
228
229 fn create_parent_context(&self, entry_name: String) -> ParentContext {
230 ParentContext::new(
231 *self.blob_id(),
232 self.lock.clone(),
233 entry_name,
234 self.parent.clone(),
235 )
236 }
237
238 #[instrument(skip(self))]
249 pub(crate) async fn remove_entry(
250 &mut self,
251 name: &str,
252 branch_id: &PublicKey,
253 version_vector: VersionVector,
254 ) -> Result<()> {
255 let mut tx = self.branch().store().begin_write().await?;
256 let mut changeset = Changeset::new();
257
258 self.check_directory_empty(&mut tx, name).await?;
261
262 let content = self
263 .begin_remove_entry(
264 &mut tx,
265 &mut changeset,
266 name,
267 branch_id,
268 version_vector,
269 TombstoneCause::Removed,
270 )
271 .await?;
272
273 self.commit(tx, changeset).await?;
274 self.finalize(content);
275
276 Ok(())
277 }
278
279 #[instrument()]
284 pub(crate) async fn create_tombstone(
285 &mut self,
286 name: &str,
287 tombstone: EntryTombstoneData,
288 ) -> Result<()> {
289 let mut tx = self.branch().store().begin_write().await?;
290 let mut changeset = Changeset::new();
291
292 match self.lookup(name) {
293 Ok(EntryRef::File(_) | EntryRef::Directory(_)) | Err(Error::EntryNotFound) => (),
294 Ok(EntryRef::Tombstone(old_entry)) => {
295 if &tombstone.version_vector <= old_entry.version_vector() {
298 return Ok(());
299 }
300 }
301 Err(e) => return Err(e),
302 }
303
304 let content = self
305 .begin_insert_entry(
306 &mut tx,
307 &mut changeset,
308 name.to_owned(),
309 EntryData::Tombstone(tombstone),
310 )
311 .await?;
312
313 self.commit(tx, changeset).await?;
314 self.finalize(content);
315
316 Ok(())
317 }
318
319 pub(crate) async fn move_entry(
341 &mut self,
342 src_name: &str,
343 src_data: EntryData,
344 dst_dir: &mut Directory,
345 dst_name: &str,
346 dst_vv: VersionVector,
347 ) -> Result<()> {
348 let mut dst_data = src_data;
349 let src_vv = mem::replace(dst_data.version_vector_mut(), dst_vv);
350
351 let mut tx = self.branch().store().begin_write().await?;
352
353 let mut changeset = Changeset::new();
354 let dst_content = dst_dir
355 .begin_insert_entry(&mut tx, &mut changeset, dst_name.to_owned(), dst_data)
356 .await?;
357
358 changeset
361 .apply(
362 &mut tx,
363 self.branch().id(),
364 self.branch()
365 .keys()
366 .write()
367 .ok_or(Error::PermissionDenied)?,
368 )
369 .await?;
370
371 let branch_id = *self.branch().id();
372 let mut changeset = Changeset::new();
373 let src_content = self
374 .begin_remove_entry(
375 &mut tx,
376 &mut changeset,
377 src_name,
378 &branch_id,
379 src_vv,
380 TombstoneCause::Moved,
381 )
382 .await?;
383
384 self.commit(tx, changeset).await?;
385 self.finalize(src_content);
386 dst_dir.finalize(dst_content);
387
388 Ok(())
389 }
390
391 #[instrument(name = "fork directory", skip_all)]
397 #[async_recursion]
398 pub(crate) async fn fork(&self, dst_branch: &Branch) -> Result<Directory> {
399 if dst_branch.id() == self.branch().id() {
400 return Ok(self.clone());
401 }
402
403 let (parent, current_vv, initial_vv) = self.prepare_fork().await?;
407
408 if let Some((parent_dir, entry_name)) = parent {
409 let mut parent_dir = parent_dir.fork(dst_branch).await?;
410 let blob_id = *self.blob_id();
411
412 parent_dir
413 .fork_into(entry_name, blob_id, current_vv, initial_vv)
414 .await
415 } else {
416 Self::open_or_create_root(dst_branch.clone(), initial_vv).await
417 }
418 }
419
420 async fn prepare_fork(&self) -> Result<(Option<(Self, &str)>, VersionVector, VersionVector)> {
426 let mut tx = self.branch().store().begin_read().await?;
429
430 let (parent, current_vv) = if let Some(parent) = &self.parent {
431 let parent_dir = parent.open_in(&mut tx, self.branch().clone()).await?;
432 let entry_name = parent.entry_name();
433 let current_vv = parent_dir.lookup(entry_name)?.version_vector().clone();
434
435 (Some((parent_dir, entry_name)), current_vv)
436 } else {
437 let current_vv = tx
438 .load_latest_approved_root_node(self.branch().id(), RootNodeFilter::Any)
439 .await?
440 .proof
441 .into_version_vector();
442
443 (None, current_vv)
444 };
445
446 let (_, content) = self.load(&mut tx, DirectoryFallback::Disabled).await?;
447
448 let entries_vv = content
449 .iter()
450 .map(|(_, entry)| entry.version_vector())
451 .sum();
452 let initial_vv = current_vv.saturating_sub(&entries_vv);
453
454 Ok((parent, current_vv, initial_vv))
455 }
456
457 async fn fork_into(
459 &mut self,
460 name: &str,
461 src_blob_id: BlobId,
462 src_current_vv: VersionVector,
463 src_initial_vv: VersionVector,
464 ) -> Result<Self> {
465 let new_lock = self.branch().locker().read(src_blob_id).await;
466 let (mut tx, old_lock, old_vv) = self.begin_fork(name).await?;
467 let mut changeset = Changeset::new();
468
469 let (dir, content) = if let Some(old_lock) = old_lock {
470 let new_lock = match src_current_vv.partial_cmp(&old_vv) {
472 Some(Ordering::Greater) => new_lock,
473 Some(Ordering::Less) => old_lock.clone(),
474 Some(Ordering::Equal) | None => {
475 if new_lock.blob_id() > old_lock.blob_id() {
478 new_lock
479 } else {
480 old_lock.clone()
481 }
482 }
483 };
484
485 self.fork_update(
486 &mut tx,
487 &mut changeset,
488 name,
489 old_lock,
490 new_lock,
491 src_initial_vv,
492 )
493 .await?
494 } else {
495 self.create_directory_in(
496 new_lock,
497 &mut tx,
498 &mut changeset,
499 name.to_owned(),
500 src_blob_id,
501 &src_initial_vv,
502 )
503 .await?
504 };
505
506 self.commit(tx, changeset).await?;
507 self.finalize(content);
508
509 Ok(dir)
510 }
511
512 async fn begin_fork(
516 &mut self,
517 name: &str,
518 ) -> Result<(WriteTransaction, Option<ReadLock>, VersionVector)> {
519 let mut old_blob_id = match self.lookup(name) {
520 Ok(EntryRef::Directory(entry)) => Some(*entry.blob_id()),
521 Ok(EntryRef::File(_) | EntryRef::Tombstone(_)) | Err(Error::EntryNotFound) => None,
522 Err(error) => return Err(error),
523 };
524
525 loop {
526 let old_lock = if let Some(old_blob_id) = old_blob_id {
527 Some(self.branch().locker().read(old_blob_id).await)
528 } else {
529 None
530 };
531
532 let mut tx = self.branch().store().begin_write().await?;
533
534 self.refresh_in(&mut tx).await?;
535
536 match (self.lookup(name), old_lock) {
537 (Ok(EntryRef::Directory(entry)), Some(old_lock))
538 if entry.blob_id() == old_lock.blob_id() =>
539 {
540 return Ok((tx, Some(old_lock), entry.version_vector().clone()));
541 }
542 (Ok(EntryRef::Directory(entry)), _) => {
543 old_blob_id = Some(*entry.blob_id());
544 continue;
545 }
546 (Ok(EntryRef::Tombstone(_)) | Err(Error::EntryNotFound), None) => {
547 return Ok((tx, None, VersionVector::new()))
548 }
549 (Ok(EntryRef::Tombstone(_)) | Err(Error::EntryNotFound), Some(_)) => {
550 old_blob_id = None;
551 continue;
552 }
553 (Ok(EntryRef::File(_)), _) => return Err(Error::EntryIsFile),
554 (Err(error), _) => return Err(error),
555 }
556 }
557 }
558
559 async fn fork_update(
565 &mut self,
566 tx: &mut WriteTransaction,
567 changeset: &mut Changeset,
568 name: &str,
569 old_lock: ReadLock,
570 new_lock: ReadLock,
571 initial_vv: VersionVector,
572 ) -> Result<(Self, Content)> {
573 let old_blob_id = *old_lock.blob_id();
574 let new_blob_id = *new_lock.blob_id();
575
576 let parent_context = self.create_parent_context(name.to_owned());
577
578 let mut dir = Self::open_in(
579 Some(old_lock),
580 tx,
581 self.branch().clone(),
582 old_blob_id,
583 Some(parent_context),
584 DirectoryFallback::Disabled,
585 )
586 .await?;
587
588 let mut self_content = self.content.clone();
589
590 let entry = match self_content.get_mut(name) {
591 Some(EntryData::Directory(entry)) => entry,
592 Some(EntryData::File(_) | EntryData::Tombstone(_)) | None => unreachable!(),
593 };
594
595 let bump = Bump::Merge(initial_vv);
596 let diff = bump.apply(&mut entry.version_vector);
597
598 if new_blob_id != entry.blob_id {
600 mem::replace(
611 &mut dir.blob,
612 Blob::create(self.branch().clone(), new_blob_id),
613 )
614 .remove(changeset);
615
616 dir.lock = Some(new_lock);
617
618 let dir_content = mem::replace(&mut dir.content, Content::empty());
619 dir.save(tx, changeset, &dir_content).await?;
620
621 entry.blob_id = new_blob_id;
622 }
623
624 self.save(tx, changeset, &self_content).await?;
625 self.bump(tx, changeset, Bump::Add(diff)).await?;
626
627 Ok((dir, self_content))
628 }
629
630 pub(crate) async fn parent(&self) -> Result<Option<Directory>> {
631 if let Some(parent) = &self.parent {
632 Ok(Some(parent.open(self.branch().clone()).await?))
633 } else {
634 Ok(None)
635 }
636 }
637
638 pub(crate) fn is_root(&self) -> bool {
639 self.parent.is_none()
640 }
641
642 async fn open_in(
643 lock: Option<ReadLock>,
644 tx: &mut ReadTransaction,
645 branch: Branch,
646 blob_id: BlobId,
647 parent: Option<ParentContext>,
648 fallback: DirectoryFallback,
649 ) -> Result<Self> {
650 let (blob, content) = load(tx, branch, blob_id, fallback).await?;
651
652 Ok(Self {
653 blob,
654 parent,
655 content,
656 lock,
657 })
658 }
659
660 async fn open_snapshot(
661 tx: &mut ReadTransaction,
662 branch: Branch,
663 locator: Locator,
664 fallback: DirectoryFallback,
665 ) -> Result<Content> {
666 let (_, content) = load(tx, branch, *locator.blob_id(), fallback).await?;
667 Ok(content)
668 }
669
670 async fn open(
671 branch: Branch,
672 blob_id: BlobId,
673 parent: Option<ParentContext>,
674 locking: DirectoryLocking,
675 fallback: DirectoryFallback,
676 ) -> Result<Self> {
677 let lock = match locking {
678 DirectoryLocking::Enabled => Some(branch.locker().read(blob_id).await),
679 DirectoryLocking::Disabled => None,
680 };
681
682 let mut tx = branch.store().begin_read().await?;
683 Self::open_in(lock, &mut tx, branch, blob_id, parent, fallback).await
684 }
685
686 fn create(
687 lock: ReadLock,
688 branch: Branch,
689 blob_id: BlobId,
690 parent: Option<ParentContext>,
691 ) -> Self {
692 let blob = Blob::create(branch, blob_id);
693
694 Directory {
695 blob,
696 parent,
697 content: Content::empty(),
698 lock: Some(lock),
699 }
700 }
701
702 #[async_recursion]
703 pub async fn debug_print(&self, print: DebugPrinter) {
704 for (name, entry_data) in &self.content {
705 print.display(&format_args!("{:?}: {:?}", name, entry_data));
706
707 match entry_data {
708 EntryData::File(file_data) => {
709 let print = print.indent();
710
711 let parent_context = self.create_parent_context(name.into());
712 let file = File::open(
713 self.blob.branch().clone(),
714 Locator::head(file_data.blob_id),
715 parent_context,
716 )
717 .await;
718
719 match file {
720 Ok(mut file) => {
721 let mut buf = [0; 32];
722 let lenght_result = file.read(&mut buf).await;
723 match lenght_result {
724 Ok(length) => {
725 let file_len = file.len();
726 let ellipsis = if file_len > length as u64 { ".." } else { "" };
727 print.display(&format!(
728 "Content: {:?}{}",
729 std::str::from_utf8(&buf[..length]),
730 ellipsis
731 ));
732 }
733 Err(e) => {
734 print.display(&format!("Failed to read {:?}", e));
735 }
736 }
737 }
738 Err(e) => {
739 print.display(&format!("Failed to open {:?}", e));
740 }
741 }
742 }
743 EntryData::Directory(data) => {
744 let print = print.indent();
745
746 let parent_context = self.create_parent_context(name.into());
747 let dir = Directory::open(
748 self.blob.branch().clone(),
749 data.blob_id,
750 Some(parent_context),
751 DirectoryLocking::Disabled,
752 DirectoryFallback::Disabled,
753 )
754 .await;
755
756 match dir {
757 Ok(dir) => {
758 dir.debug_print(print).await;
759 }
760 Err(e) => {
761 print.display(&format!("Failed to open {:?}", e));
762 }
763 }
764 }
765 EntryData::Tombstone(_) => {}
766 }
767 }
768 }
769
770 pub fn branch(&self) -> &Branch {
772 self.blob.branch()
773 }
774
775 pub(crate) fn blob_id(&self) -> &BlobId {
777 self.blob.id()
778 }
779
780 pub fn len(&self) -> u64 {
783 self.blob.len()
784 }
785
786 pub(crate) async fn version_vector(&self) -> Result<VersionVector> {
787 if let Some(parent) = &self.parent {
788 parent.entry_version_vector(self.branch().clone()).await
789 } else {
790 self.branch().version_vector().await
791 }
792 }
793
794 async fn begin_remove_entry(
795 &mut self,
796 tx: &mut ReadTransaction,
797 changeset: &mut Changeset,
798 name: &str,
799 branch_id: &PublicKey,
800 version_vector: VersionVector,
801 cause: TombstoneCause,
802 ) -> Result<Content> {
803 let mut new_data = match self.lookup(name) {
804 Ok(old_entry)
805 if branch_id != self.branch().id()
806 && old_entry
807 .version_vector()
808 .partial_cmp(&version_vector)
809 .is_none() =>
810 {
811 let mut new_data = old_entry.clone_data();
812 new_data.version_vector_mut().merge(&version_vector);
813 new_data
814 }
815 Ok(_) | Err(Error::EntryNotFound) => {
816 EntryData::Tombstone(EntryTombstoneData::new(cause, version_vector))
817 }
818 Err(error) => return Err(error),
819 };
820
821 new_data.version_vector_mut().increment(*self.branch().id());
822
823 self.begin_insert_entry(tx, changeset, name.to_owned(), new_data)
824 .await
825 }
826
827 async fn check_directory_empty(&self, tx: &mut ReadTransaction, name: &str) -> Result<()> {
828 match self.lookup(name) {
829 Ok(EntryRef::Directory(entry)) => {
830 if entry
831 .open_snapshot(tx, DirectoryFallback::Disabled)
832 .await?
833 .iter()
834 .all(|(_, data)| matches!(data, EntryData::Tombstone(_)))
835 {
836 Ok(())
837 } else {
838 Err(Error::DirectoryNotEmpty)
839 }
840 }
841 Ok(_) | Err(Error::EntryNotFound) => Ok(()),
842 Err(error) => Err(error),
843 }
844 }
845
846 async fn begin_insert_entry(
847 &mut self,
848 tx: &mut ReadTransaction,
849 changeset: &mut Changeset,
850 name: String,
851 data: EntryData,
852 ) -> Result<Content> {
853 self.refresh_in(tx).await?;
854
855 let mut content = self.content.clone();
856 let diff = content.insert(name, data)?;
857 self.save(tx, changeset, &content).await?;
858 self.bump(tx, changeset, Bump::Add(diff)).await?;
859
860 Ok(content)
861 }
862
863 async fn refresh_in(&mut self, tx: &mut ReadTransaction) -> Result<()> {
864 if self.blob.is_dirty() {
865 Ok(())
866 } else {
867 let (blob, content) = self.load(tx, DirectoryFallback::Disabled).await?;
868 self.blob = blob;
869 self.content = content;
870
871 Ok(())
872 }
873 }
874
875 async fn load(
876 &self,
877 tx: &mut ReadTransaction,
878 fallback: DirectoryFallback,
879 ) -> Result<(Blob, Content)> {
880 load(tx, self.branch().clone(), *self.blob_id(), fallback).await
881 }
882
883 async fn save(
884 &mut self,
885 tx: &mut ReadTransaction,
886 changeset: &mut Changeset,
887 content: &Content,
888 ) -> Result<()> {
889 let buffer = content.serialize();
891 self.blob.truncate(0)?;
892 self.blob.write_all(tx, changeset, &buffer).await?;
893 self.blob.flush(tx, changeset).await?;
894
895 Ok(())
896 }
897
898 async fn commit(&mut self, tx: WriteTransaction, changeset: Changeset) -> Result<()> {
900 commit(tx, changeset, self.branch()).await
901 }
902
903 #[async_recursion]
905 async fn bump(
906 &mut self,
907 tx: &mut ReadTransaction,
908 changeset: &mut Changeset,
909 bump: Bump,
910 ) -> Result<()> {
911 if let Some(parent) = self.parent.as_mut() {
913 parent
914 .bump(tx, changeset, self.blob.branch().clone(), bump)
915 .await
916 } else {
917 changeset.bump(bump);
918 Ok(())
919 }
920 }
921
922 fn finalize(&mut self, content: Content) {
924 self.content = content;
925 }
926}
927
928impl fmt::Debug for Directory {
929 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
930 f.debug_struct("Directory")
931 .field(
932 "name",
933 &self
934 .parent
935 .as_ref()
936 .map(|parent| parent.entry_name())
937 .unwrap_or("/"),
938 )
939 .field("branch", self.branch().id())
940 .field("blob_id", self.blob_id())
941 .finish()
942 }
943}
944
945#[derive(Clone, Copy)]
947pub(crate) enum DirectoryFallback {
948 Enabled,
949 Disabled,
950}
951
952#[derive(Clone, Copy)]
954pub(crate) enum DirectoryLocking {
955 Enabled,
956 Disabled,
957}
958
959#[instrument(skip(branch), fields(writer_id = ?branch.id()))]
962pub(crate) async fn bump_root(branch: &Branch, merge: VersionVector) -> Result<()> {
963 let tx = branch.store().begin_write().await?;
964 let mut changeset = Changeset::new();
965 changeset.force_bump(true);
966 changeset.bump(Bump::Merge(merge));
967 commit(tx, changeset, branch).await
968}
969
970async fn load(
972 tx: &mut ReadTransaction,
973 branch: Branch,
974 blob_id: BlobId,
975 fallback: DirectoryFallback,
976) -> Result<(Blob, Content)> {
977 let mut root_node = tx
978 .load_latest_approved_root_node(branch.id(), RootNodeFilter::Any)
979 .await?;
980
981 loop {
982 let error = match load_at(tx, &root_node, branch.clone(), blob_id).await {
983 Ok((blob, content)) => return Ok((blob, content)),
984 Err(error @ Error::Store(store::Error::BlockNotFound)) => error,
985 Err(error) => return Err(error),
986 };
987
988 match fallback {
989 DirectoryFallback::Enabled => (),
990 DirectoryFallback::Disabled => return Err(error),
991 }
992
993 if let Some(prev) = tx.load_prev_approved_root_node(&root_node).await? {
994 root_node = prev;
995 } else {
996 return Err(error);
997 }
998 }
999}
1000
1001async fn load_at(
1002 tx: &mut ReadTransaction,
1003 root_node: &RootNode,
1004 branch: Branch,
1005 blob_id: BlobId,
1006) -> Result<(Blob, Content)> {
1007 let mut blob = Blob::open_at(tx, root_node, branch, blob_id).await?;
1008 let buffer = blob.read_to_end_at(tx, root_node).await?;
1009 let content = Content::deserialize(&buffer)?;
1010
1011 Ok((blob, content))
1012}
1013
1014async fn commit(mut tx: WriteTransaction, changeset: Changeset, branch: &Branch) -> Result<()> {
1016 let changed = changeset
1017 .apply(
1018 &mut tx,
1019 branch.id(),
1020 branch.keys().write().ok_or(Error::PermissionDenied)?,
1021 )
1022 .await?;
1023
1024 if !changed {
1025 return Ok(());
1026 }
1027
1028 let event_tx = branch.notify();
1029 tx.commit_and_then(move || event_tx.send()).await?;
1030
1031 Ok(())
1032}