1#[cfg(test)]
2mod tests;
3
4use crate::{
5 branch::Branch,
6 conflict,
7 crypto::sign::PublicKey,
8 directory::{
9 self, Directory, DirectoryFallback, DirectoryRef, EntryRef, EntryTombstoneData, EntryType,
10 FileRef,
11 },
12 error::{Error, Result},
13 file::File,
14 iterator::{Accumulate, SortedUnion},
15 store,
16 version_vector::VersionVector,
17 versioned::{self, PreferBranch},
18};
19use async_recursion::async_recursion;
20use camino::{Utf8Component, Utf8Path};
21use either::Either;
22use std::{
23 borrow::Cow,
24 collections::{BTreeMap, VecDeque},
25 fmt, iter, mem,
26};
27use tracing::{Instrument, instrument};
28
29#[derive(Clone)]
31pub struct JointDirectory {
32 versions: BTreeMap<PublicKey, Directory>,
33 local_branch: Option<Branch>,
34}
35
36impl JointDirectory {
37 pub fn new<I>(local_branch: Option<Branch>, versions: I) -> Self
41 where
42 I: IntoIterator<Item = Directory>,
43 {
44 let versions = versions
45 .into_iter()
46 .map(|dir| (*dir.branch().id(), dir))
47 .collect();
48
49 Self {
50 versions,
51 local_branch,
52 }
53 }
54
55 pub(crate) fn local_version(&self) -> Option<&Directory> {
56 self.local_branch
57 .as_ref()
58 .and_then(|branch| self.versions.get(branch.id()))
59 }
60
61 pub(crate) fn local_version_mut(&mut self) -> Option<&mut Directory> {
62 self.local_branch
63 .as_ref()
64 .and_then(|branch| self.versions.get_mut(branch.id()))
65 }
66
67 pub fn is_empty(&self) -> bool {
68 self.entries().next().is_none()
69 }
70
71 pub fn entries(&self) -> impl Iterator<Item = JointEntryRef<'_>> {
75 self.merge_entries()
76 .flat_map(|(_, merge)| merge.ignore_tombstones())
77 }
78
79 fn merge_entries(&self) -> impl Iterator<Item = (&str, Merge<'_>)> {
80 let entries = self.versions.values().map(|directory| directory.entries());
81 let entries = SortedUnion::new(entries, |entry| entry.name());
82 let entries = Accumulate::new(entries, |entry| entry.name());
83 entries.map(|(name, entries)| {
84 (
85 name,
86 Merge::new(entries.into_iter(), self.local_branch.as_ref()),
87 )
88 })
89 }
90
91 pub fn lookup<'a>(&'a self, name: &'a str) -> impl Iterator<Item = JointEntryRef<'a>> + 'a {
94 Merge::new(
95 self.versions
96 .values()
97 .filter_map(move |dir| dir.lookup(name).ok()),
98 self.local_branch.as_ref(),
99 )
100 .ignore_tombstones()
101 }
102
103 pub fn lookup_unique<'a>(&'a self, name: &'a str) -> Result<JointEntryRef<'a>> {
114 let mut entries =
116 Merge::new(self.entry_versions(name), self.local_branch.as_ref()).ignore_tombstones();
117 if let Some(entry) = entries.next() {
118 if entries.next().is_none() {
119 return Ok(entry);
120 } else {
121 return Err(Error::AmbiguousEntry);
122 }
123 }
124
125 let (name, branch_id_prefix) = conflict::parse_unique_name(name);
128 let branch_id_prefix = branch_id_prefix.ok_or(Error::EntryNotFound)?;
129
130 let mut entries = Merge::new(self.entry_versions(name), self.local_branch.as_ref())
131 .ignore_tombstones()
132 .filter(|entry| entry.first_branch().id().starts_with(&branch_id_prefix));
133
134 let first = entries.next().ok_or(Error::EntryNotFound)?;
135
136 if entries.next().is_none() {
137 Ok(first)
138 } else {
139 Err(Error::AmbiguousEntry)
140 }
141 }
142
143 #[instrument(skip(self), err(Debug))]
145 pub fn lookup_version<'a>(
146 &'a self,
147 name: &'a str,
148 branch_id: &'a PublicKey,
149 ) -> Result<FileRef<'a>> {
150 self.versions
151 .get(branch_id)
152 .ok_or(Error::EntryNotFound)
153 .and_then(|dir| dir.lookup(name))
154 .and_then(|entry| entry.file())
155 }
156
157 #[allow(clippy::len_without_is_empty)]
160 pub fn len(&self) -> u64 {
161 self.versions.values().map(|dir| dir.len()).sum()
162 }
163
164 pub fn has_local_version(&self) -> bool {
165 self.local_branch
166 .as_ref()
167 .map(|local_branch| self.versions.contains_key(local_branch.id()))
168 .unwrap_or(false)
169 }
170
171 pub async fn cd(&self, path: impl AsRef<Utf8Path>) -> Result<Self> {
175 let mut curr = Cow::Borrowed(self);
176
177 for component in path.as_ref().components() {
178 match component {
179 Utf8Component::RootDir | Utf8Component::CurDir => (),
180 Utf8Component::Normal(name) => {
181 let next = curr
182 .lookup(name)
183 .find_map(|entry| entry.directory().ok())
184 .ok_or(Error::EntryNotFound)?
185 .open()
186 .await?;
187 curr = Cow::Owned(next);
188 }
189 Utf8Component::ParentDir | Utf8Component::Prefix(_) => {
190 return Err(Error::OperationNotSupported);
191 }
192 }
193 }
194
195 Ok(curr.into_owned())
196 }
197
198 pub async fn remove_entry(&mut self, name: &str) -> Result<()> {
201 self.remove_entries(Pattern::Unique(name)).await
202 }
203
204 pub async fn remove_entry_recursively(&mut self, name: &str) -> Result<()> {
207 self.remove_entries_recursively(Pattern::Unique(name)).await
208 }
209
210 async fn remove_entries(&mut self, pattern: Pattern<'_>) -> Result<()> {
211 let local_branch = self.local_branch.as_ref().ok_or(Error::PermissionDenied)?;
212
213 let entries: Vec<_> = pattern
214 .apply(self)?
215 .map(|entry| {
216 let name = entry.name().to_owned();
217 let branch_id = match &entry {
218 JointEntryRef::File(entry) => *entry.branch().id(),
219 JointEntryRef::Directory(_) => *local_branch.id(),
220 };
221 let vv = entry.version_vector().into_owned();
222
223 (name, branch_id, vv)
224 })
225 .collect();
226
227 let local_version = self.fork().await?;
228
229 for (name, branch_id, vv) in entries {
230 local_version.remove_entry(&name, &branch_id, vv).await?;
231 }
232
233 Ok(())
234 }
235
236 #[async_recursion]
237 async fn remove_entries_recursively<'a>(&'a mut self, pattern: Pattern<'a>) -> Result<()> {
238 for entry in pattern.apply(self)?.filter_map(|e| e.directory().ok()) {
239 let mut dir = entry
240 .open_with(MissingVersionStrategy::Skip, DirectoryFallback::Disabled)
241 .await?;
242 dir.remove_entries_recursively(Pattern::All).await?;
243 }
244
245 if let Some(local_version) = self.local_version_mut() {
246 local_version.refresh().await?;
247 }
248
249 self.remove_entries(pattern).await
250 }
251
252 #[async_recursion]
263 pub async fn merge<'a>(&'a mut self) -> Result<(MergeStatus, &'a Directory)> {
264 let old_version_vector = if let Some(local_version) = self.local_version() {
265 local_version.version_vector().await?
266 } else {
267 VersionVector::new()
268 };
269
270 let new_version_vector = self.merge_version_vectors().await?;
271
272 if !old_version_vector.is_empty() && old_version_vector >= new_version_vector {
273 tracing::trace!(old = ?old_version_vector, "Merge not started - already up to date");
275 return Ok((MergeStatus::Unchanged, self.local_version().unwrap()));
278 } else {
279 tracing::trace!(old = ?old_version_vector, new = ?new_version_vector, "Merge started");
280 }
281
282 let local_version = self.fork().await?;
283 let local_branch = local_version.branch().clone();
284
285 let mut conflict = false;
286 let mut check_for_removal = Vec::new();
287
288 for (name, merge) in self.merge_entries() {
289 match merge {
290 Merge::Existing(existing) => {
291 for entry in existing {
292 match entry {
293 JointEntryRef::File(entry) => {
294 match entry.fork(&local_branch).await {
295 Ok(()) => {}
296 Err(Error::EntryExists) => {
297 conflict = true;
302 }
303 Err(error) => return Err(error),
304 }
305 }
306 JointEntryRef::Directory(entry) => {
307 let mut dir = entry
308 .open_with(
309 MissingVersionStrategy::Fail,
310 DirectoryFallback::Disabled,
311 )
312 .await?;
313 match dir
314 .merge()
315 .instrument(tracing::info_span!("dir", message = name))
316 .await
317 {
318 Ok((MergeStatus::Conflict, _)) => {
319 conflict = true;
320 }
321 Ok((MergeStatus::Completed | MergeStatus::Unchanged, _)) => (),
322 Err(error) => return Err(error),
323 }
324 }
325 }
326 }
327 }
328 Merge::Tombstone(tombstone) => {
329 check_for_removal.push((name.to_owned(), tombstone));
330 }
331 }
332 }
333
334 let local_version = self.local_version_mut().unwrap();
337 local_version.refresh().await?;
338
339 for (name, tombstone) in check_for_removal {
340 local_version.create_tombstone(&name, tombstone).await?;
341 }
342
343 if !conflict && local_version.is_root() {
346 directory::bump_root(&local_branch, new_version_vector).await?;
347 }
348
349 if tracing::enabled!(tracing::Level::TRACE) {
350 let vv = local_version.version_vector().await?;
351 tracing::trace!(?vv, ?conflict, "Merge completed");
352 }
353
354 Ok((
355 if conflict {
356 MergeStatus::Conflict
357 } else {
358 MergeStatus::Completed
359 },
360 local_version,
361 ))
362 }
363
364 async fn merge_version_vectors(&self) -> Result<VersionVector> {
366 let mut outcome = VersionVector::new();
367
368 for version in self.versions.values() {
369 outcome.merge(&version.version_vector().await?);
370 }
371
372 Ok(outcome)
373 }
374
375 async fn fork(&mut self) -> Result<&mut Directory> {
376 let local_branch = self.local_branch.as_ref().ok_or(Error::PermissionDenied)?;
377
378 let mut local_version = None;
379
380 for (branch_id, version) in &self.versions {
383 if branch_id == local_branch.id() {
384 continue;
385 }
386
387 local_version = Some(version.fork(local_branch).await?);
388 }
389
390 if let Some(local_version) = local_version {
391 self.versions.insert(*local_branch.id(), local_version);
392 }
393
394 self.versions
397 .get_mut(local_branch.id())
398 .ok_or(Error::EntryNotFound)
399 }
400
401 fn entry_versions<'a>(&'a self, name: &'a str) -> impl Iterator<Item = EntryRef<'a>> {
402 self.versions
403 .values()
404 .filter_map(move |v| v.lookup(name).ok())
405 }
406}
407
408impl fmt::Debug for JointDirectory {
409 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
410 f.debug_struct("JointDirectory").finish()
411 }
412}
413
414#[derive(Debug)]
415pub enum JointEntryRef<'a> {
416 File(JointFileRef<'a>),
417 Directory(JointDirectoryRef<'a>),
418}
419
420impl<'a> JointEntryRef<'a> {
421 pub fn name(&self) -> &'a str {
422 match self {
423 Self::File(r) => r.name(),
424 Self::Directory(r) => r.name(),
425 }
426 }
427
428 pub fn unique_name(&self) -> Cow<'a, str> {
429 match self {
430 Self::File(r) => r.unique_name(),
431 Self::Directory(r) => r.unique_name(),
432 }
433 }
434
435 pub fn entry_type(&self) -> EntryType {
436 match self {
437 Self::File(_) => EntryType::File,
438 Self::Directory(_) => EntryType::Directory,
439 }
440 }
441
442 pub fn version_vector(&'a self) -> Cow<'a, VersionVector> {
443 match self {
444 Self::File(r) => Cow::Borrowed(r.version_vector()),
445 Self::Directory(r) => Cow::Owned(r.version_vector()),
446 }
447 }
448
449 pub fn file(self) -> Result<FileRef<'a>> {
450 match self {
451 Self::File(r) => Ok(r.file),
452 Self::Directory(_) => Err(Error::EntryIsDirectory),
453 }
454 }
455
456 pub fn directory(self) -> Result<JointDirectoryRef<'a>> {
457 match self {
458 Self::Directory(r) => Ok(r),
459 Self::File(_) => Err(Error::EntryIsFile),
460 }
461 }
462
463 fn first_branch(&self) -> &Branch {
464 match self {
465 Self::File(r) => r.branch(),
466 Self::Directory(r) => r.first_version().branch(),
467 }
468 }
469}
470
471#[derive(Debug)]
472pub struct JointFileRef<'a> {
473 file: FileRef<'a>,
474 needs_disambiguation: bool,
475}
476
477impl<'a> JointFileRef<'a> {
478 pub fn name(&self) -> &'a str {
479 self.file.name()
480 }
481
482 pub fn unique_name(&self) -> Cow<'a, str> {
483 if self.needs_disambiguation {
484 Cow::from(conflict::create_unique_name(
485 self.name(),
486 self.file.branch().id(),
487 ))
488 } else {
489 Cow::from(self.name())
490 }
491 }
492
493 pub async fn open(&self) -> Result<File> {
494 self.file.open().await
495 }
496
497 pub(crate) async fn fork(&self, dst_branch: &Branch) -> Result<()> {
498 self.file.fork(dst_branch).await
499 }
500
501 pub fn version_vector(&self) -> &'a VersionVector {
502 self.file.version_vector()
503 }
504
505 pub fn branch(&self) -> &Branch {
506 self.file.branch()
507 }
508
509 pub fn parent(&self) -> &Directory {
510 self.file.parent()
511 }
512
513 pub fn inner(&self) -> FileRef<'a> {
514 self.file
515 }
516}
517
518pub struct JointDirectoryRef<'a> {
519 versions: Vec<DirectoryRef<'a>>,
520 local_branch: Option<&'a Branch>,
521 needs_disambiguation: bool,
522}
523
524impl<'a> JointDirectoryRef<'a> {
525 fn new(
526 versions: Vec<DirectoryRef<'a>>,
527 local_branch: Option<&'a Branch>,
528 needs_disambiguation: bool,
529 ) -> Option<Self> {
530 if versions.is_empty() {
531 None
532 } else {
533 Some(Self {
534 versions,
535 local_branch,
536 needs_disambiguation,
537 })
538 }
539 }
540
541 pub fn name(&self) -> &'a str {
542 self.first_version().name()
543 }
544
545 pub fn unique_name(&self) -> Cow<'a, str> {
546 if self.needs_disambiguation {
547 Cow::from(conflict::create_unique_name(
548 self.name(),
549 self.first_version().branch().id(),
550 ))
551 } else {
552 Cow::from(self.name())
553 }
554 }
555
556 pub fn version_vector(&self) -> VersionVector {
557 self.versions
558 .iter()
559 .fold(VersionVector::new(), |mut vv, dir| {
560 vv.merge(dir.version_vector());
561 vv
562 })
563 }
564
565 pub async fn open(&self) -> Result<JointDirectory> {
566 self.open_with(MissingVersionStrategy::Skip, DirectoryFallback::Enabled)
567 .await
568 }
569
570 pub(crate) async fn open_with(
571 &self,
572 missing_version_strategy: MissingVersionStrategy,
573 fallback: DirectoryFallback,
574 ) -> Result<JointDirectory> {
575 let mut versions = Vec::new();
576 for version in &self.versions {
577 match version.open(fallback).await {
578 Ok(open_dir) => versions.push(open_dir),
579 Err(e)
580 if self
581 .local_branch
582 .map(|local_branch| version.branch().id() == local_branch.id())
583 .unwrap_or(false) =>
584 {
585 return Err(e);
586 }
587 Err(Error::Store(store::Error::BlockNotFound))
588 if matches!(missing_version_strategy, MissingVersionStrategy::Skip) =>
589 {
590 continue;
594 }
595 Err(e) => return Err(e),
596 }
597 }
598
599 Ok(JointDirectory::new(self.local_branch.cloned(), versions))
600 }
601
602 pub(crate) fn versions(&self) -> &[DirectoryRef<'_>] {
603 &self.versions
604 }
605
606 fn first_version(&self) -> &DirectoryRef<'a> {
607 self.versions
608 .first()
609 .expect("joint directory must contain at least one directory")
610 }
611}
612
613impl fmt::Debug for JointDirectoryRef<'_> {
614 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
615 f.debug_struct("JointDirectoryRef")
616 .field("name", &self.name())
617 .finish()
618 }
619}
620
621#[derive(Copy, Clone)]
623pub enum MissingVersionStrategy {
624 Skip,
626 Fail,
628}
629
630#[derive(Copy, Clone, Debug)]
632pub enum MergeStatus {
633 Completed,
635 Conflict,
637 Unchanged,
639}
640
641#[derive(Clone)]
646enum Merge<'a> {
647 Existing(Existing<'a>),
649 Tombstone(EntryTombstoneData),
651}
652
653#[derive(Default, Clone)]
654struct Existing<'a> {
655 files: VecDeque<FileRef<'a>>,
659 directories: Vec<DirectoryRef<'a>>,
660 needs_disambiguation: bool,
661 local_branch: Option<&'a Branch>,
662}
663
664impl<'a> Iterator for Existing<'a> {
665 type Item = JointEntryRef<'a>;
666
667 fn next(&mut self) -> Option<Self::Item> {
668 if let Some(dir) = JointDirectoryRef::new(
669 mem::take(&mut self.directories),
670 self.local_branch,
671 self.needs_disambiguation,
672 ) {
673 return Some(JointEntryRef::Directory(dir));
674 }
675
676 Some(JointEntryRef::File(JointFileRef {
677 file: self.files.pop_front()?,
678 needs_disambiguation: self.needs_disambiguation,
679 }))
680 }
681}
682
683impl<'a> Merge<'a> {
684 fn new<I>(entries: I, local_branch: Option<&'a Branch>) -> Self
687 where
688 I: Iterator<Item = EntryRef<'a>>,
689 {
690 let mut files = VecDeque::new();
691 let mut directories = vec![];
692 let mut tombstone: Option<EntryTombstoneData> = None;
693
694 let entries = versioned::keep_maximal(entries, PreferBranch(local_branch.map(Branch::id)));
696
697 for entry in entries {
698 match entry {
699 EntryRef::File(file) => files.push_back(file),
700 EntryRef::Directory(dir) => directories.push(dir),
701 EntryRef::Tombstone(_) if !files.is_empty() || !directories.is_empty() => continue,
702 EntryRef::Tombstone(new_tombstone) => {
703 let new_tombstone = if let Some(mut old_tombstone) = tombstone.take() {
704 old_tombstone.merge(new_tombstone.data());
705 old_tombstone
706 } else {
707 new_tombstone.data().clone()
708 };
709
710 tombstone = Some(new_tombstone);
711 }
712 }
713 }
714
715 let needs_disambiguation = files.len() + if directories.is_empty() { 0 } else { 1 } > 1;
716
717 match tombstone {
718 Some(tombstone) if files.is_empty() && directories.is_empty() => {
719 Self::Tombstone(tombstone)
720 }
721 Some(_) | None => Self::Existing(Existing {
722 files,
723 directories,
724 needs_disambiguation,
725 local_branch,
726 }),
727 }
728 }
729
730 fn ignore_tombstones(self) -> Existing<'a> {
731 match self {
732 Self::Existing(existing) => existing,
733 Self::Tombstone(_) => Existing::default(),
734 }
735 }
736}
737
738enum Pattern<'a> {
739 All,
741 Unique(&'a str),
743}
744
745impl<'a> Pattern<'a> {
746 fn apply(&self, dir: &'a JointDirectory) -> Result<impl Iterator<Item = JointEntryRef<'a>>> {
747 match self {
748 Self::All => Ok(Either::Left(dir.entries())),
749 Self::Unique(name) => dir
750 .lookup_unique(name)
751 .map(|entry| Either::Right(iter::once(entry))),
752 }
753 }
754}