1#[cfg(test)]
2mod tests;
3
4use crate::{
5 branch::Branch,
6 conflict,
7 crypto::sign::PublicKey,
8 directory::{
9 self, Directory, DirectoryFallback, DirectoryRef, EntryRef, EntryTombstoneData, EntryType,
10 FileRef,
11 },
12 error::{Error, Result},
13 file::File,
14 iterator::{Accumulate, SortedUnion},
15 store,
16 version_vector::VersionVector,
17 versioned::{self, PreferBranch},
18};
19use async_recursion::async_recursion;
20use camino::{Utf8Component, Utf8Path};
21use either::Either;
22use std::{
23 borrow::Cow,
24 collections::{BTreeMap, VecDeque},
25 fmt, iter, mem,
26};
27use tracing::{instrument, Instrument};
28
29#[derive(Clone)]
31pub struct JointDirectory {
32 versions: BTreeMap<PublicKey, Directory>,
33 local_branch: Option<Branch>,
34}
35
36impl JointDirectory {
37 pub fn new<I>(local_branch: Option<Branch>, versions: I) -> Self
41 where
42 I: IntoIterator<Item = Directory>,
43 {
44 let versions = versions
45 .into_iter()
46 .map(|dir| (*dir.branch().id(), dir))
47 .collect();
48
49 Self {
50 versions,
51 local_branch,
52 }
53 }
54
55 pub(crate) fn local_version(&self) -> Option<&Directory> {
56 self.local_branch
57 .as_ref()
58 .and_then(|branch| self.versions.get(branch.id()))
59 }
60
61 pub(crate) fn local_version_mut(&mut self) -> Option<&mut Directory> {
62 self.local_branch
63 .as_ref()
64 .and_then(|branch| self.versions.get_mut(branch.id()))
65 }
66
67 pub fn is_empty(&self) -> bool {
68 self.entries().next().is_none()
69 }
70
71 pub fn entries(&self) -> impl Iterator<Item = JointEntryRef<'_>> {
75 self.merge_entries()
76 .flat_map(|(_, merge)| merge.ignore_tombstones())
77 }
78
79 fn merge_entries(&self) -> impl Iterator<Item = (&str, Merge<'_>)> {
80 let entries = self.versions.values().map(|directory| directory.entries());
81 let entries = SortedUnion::new(entries, |entry| entry.name());
82 let entries = Accumulate::new(entries, |entry| entry.name());
83 entries.map(|(name, entries)| {
84 (
85 name,
86 Merge::new(entries.into_iter(), self.local_branch.as_ref()),
87 )
88 })
89 }
90
91 pub fn lookup<'a>(&'a self, name: &'a str) -> impl Iterator<Item = JointEntryRef<'a>> + 'a {
94 Merge::new(
95 self.versions
96 .values()
97 .filter_map(move |dir| dir.lookup(name).ok()),
98 self.local_branch.as_ref(),
99 )
100 .ignore_tombstones()
101 }
102
103 pub fn lookup_unique<'a>(&'a self, name: &'a str) -> Result<JointEntryRef<'a>> {
114 let mut entries =
116 Merge::new(self.entry_versions(name), self.local_branch.as_ref()).ignore_tombstones();
117 if let Some(entry) = entries.next() {
118 if entries.next().is_none() {
119 return Ok(entry);
120 } else {
121 return Err(Error::AmbiguousEntry);
122 }
123 }
124
125 let (name, branch_id_prefix) = conflict::parse_unique_name(name);
128 let branch_id_prefix = branch_id_prefix.ok_or(Error::EntryNotFound)?;
129
130 let mut entries = Merge::new(self.entry_versions(name), self.local_branch.as_ref())
131 .ignore_tombstones()
132 .filter(|entry| entry.first_branch().id().starts_with(&branch_id_prefix));
133
134 let first = entries.next().ok_or(Error::EntryNotFound)?;
135
136 if entries.next().is_none() {
137 Ok(first)
138 } else {
139 Err(Error::AmbiguousEntry)
140 }
141 }
142
143 #[instrument(skip(self), err(Debug))]
145 pub fn lookup_version<'a>(
146 &'a self,
147 name: &'a str,
148 branch_id: &'a PublicKey,
149 ) -> Result<FileRef<'a>> {
150 self.versions
151 .get(branch_id)
152 .ok_or(Error::EntryNotFound)
153 .and_then(|dir| dir.lookup(name))
154 .and_then(|entry| entry.file())
155 }
156
157 #[allow(clippy::len_without_is_empty)]
160 pub fn len(&self) -> u64 {
161 self.versions.values().map(|dir| dir.len()).sum()
162 }
163
164 pub fn has_local_version(&self) -> bool {
165 self.local_branch
166 .as_ref()
167 .map(|local_branch| self.versions.contains_key(local_branch.id()))
168 .unwrap_or(false)
169 }
170
171 pub async fn cd(&self, path: impl AsRef<Utf8Path>) -> Result<Self> {
175 let mut curr = Cow::Borrowed(self);
176
177 for component in path.as_ref().components() {
178 match component {
179 Utf8Component::RootDir | Utf8Component::CurDir => (),
180 Utf8Component::Normal(name) => {
181 let next = curr
182 .lookup(name)
183 .find_map(|entry| entry.directory().ok())
184 .ok_or(Error::EntryNotFound)?
185 .open()
186 .await?;
187 curr = Cow::Owned(next);
188 }
189 Utf8Component::ParentDir | Utf8Component::Prefix(_) => {
190 return Err(Error::OperationNotSupported)
191 }
192 }
193 }
194
195 Ok(curr.into_owned())
196 }
197
198 pub async fn remove_entry(&mut self, name: &str) -> Result<()> {
201 self.remove_entries(Pattern::Unique(name)).await
202 }
203
204 pub async fn remove_entry_recursively(&mut self, name: &str) -> Result<()> {
207 self.remove_entries_recursively(Pattern::Unique(name)).await
208 }
209
210 async fn remove_entries(&mut self, pattern: Pattern<'_>) -> Result<()> {
211 let local_branch = self.local_branch.as_ref().ok_or(Error::PermissionDenied)?;
212
213 let entries: Vec<_> = pattern
214 .apply(self)?
215 .map(|entry| {
216 let name = entry.name().to_owned();
217 let branch_id = match &entry {
218 JointEntryRef::File(entry) => *entry.branch().id(),
219 JointEntryRef::Directory(_) => *local_branch.id(),
220 };
221 let vv = entry.version_vector().into_owned();
222
223 (name, branch_id, vv)
224 })
225 .collect();
226
227 let local_version = self.fork().await?;
228
229 for (name, branch_id, vv) in entries {
230 local_version.remove_entry(&name, &branch_id, vv).await?;
231 }
232
233 Ok(())
234 }
235
236 #[async_recursion]
237 async fn remove_entries_recursively<'a>(&'a mut self, pattern: Pattern<'a>) -> Result<()> {
238 for entry in pattern.apply(self)?.filter_map(|e| e.directory().ok()) {
239 let mut dir = entry
240 .open_with(MissingVersionStrategy::Skip, DirectoryFallback::Disabled)
241 .await?;
242 dir.remove_entries_recursively(Pattern::All).await?;
243 }
244
245 if let Some(local_version) = self.local_version_mut() {
246 local_version.refresh().await?;
247 }
248
249 self.remove_entries(pattern).await
250 }
251
252 #[async_recursion]
258 pub async fn merge(&mut self) -> Result<Directory> {
259 let old_version_vector = if let Some(local_version) = self.local_version() {
260 local_version.version_vector().await?
261 } else {
262 VersionVector::new()
263 };
264
265 let new_version_vector = self.merge_version_vectors().await?;
266
267 if !old_version_vector.is_empty() && old_version_vector >= new_version_vector {
268 tracing::trace!(old = ?old_version_vector, "Merge not started - already up to date");
270 return Ok(self.local_version().unwrap().clone());
273 } else {
274 tracing::trace!(old = ?old_version_vector, new = ?new_version_vector, "Merge started");
275 }
276
277 let local_version = self.fork().await?;
278 let local_branch = local_version.branch().clone();
279
280 let mut conflict = false;
281 let mut check_for_removal = Vec::new();
282
283 for (name, merge) in self.merge_entries() {
284 match merge {
285 Merge::Existing(existing) => {
286 for entry in existing {
287 match entry {
288 JointEntryRef::File(entry) => {
289 match entry.fork(&local_branch).await {
290 Ok(()) => {}
291 Err(Error::EntryExists) => {
292 conflict = true;
297 }
298 Err(error) => return Err(error),
299 }
300 }
301 JointEntryRef::Directory(entry) => {
302 let mut dir = entry
303 .open_with(
304 MissingVersionStrategy::Fail,
305 DirectoryFallback::Disabled,
306 )
307 .await?;
308 match dir
309 .merge()
310 .instrument(tracing::info_span!("dir", message = name))
311 .await
312 {
313 Ok(_) => (),
314 Err(Error::AmbiguousEntry) => {
315 conflict = true;
316 }
317 Err(error) => return Err(error),
318 }
319 }
320 }
321 }
322 }
323 Merge::Tombstone(tombstone) => {
324 check_for_removal.push((name.to_owned(), tombstone));
325 }
326 }
327 }
328
329 let local_version = self.local_version_mut().unwrap();
332 local_version.refresh().await?;
333
334 for (name, tombstone) in check_for_removal {
335 local_version.create_tombstone(&name, tombstone).await?;
336 }
337
338 if !conflict && local_version.is_root() {
341 directory::bump_root(&local_branch, new_version_vector).await?;
342 }
343
344 if tracing::enabled!(tracing::Level::TRACE) {
345 let vv = local_version.version_vector().await?;
346 tracing::trace!(?vv, ?conflict, "Merge completed");
347 }
348
349 if conflict {
350 Err(Error::AmbiguousEntry)
351 } else {
352 Ok(local_version.clone())
353 }
354 }
355
356 async fn merge_version_vectors(&self) -> Result<VersionVector> {
358 let mut outcome = VersionVector::new();
359
360 for version in self.versions.values() {
361 outcome.merge(&version.version_vector().await?);
362 }
363
364 Ok(outcome)
365 }
366
367 async fn fork(&mut self) -> Result<&mut Directory> {
368 let local_branch = self.local_branch.as_ref().ok_or(Error::PermissionDenied)?;
369
370 let mut local_version = None;
371
372 for (branch_id, version) in &self.versions {
375 if branch_id == local_branch.id() {
376 continue;
377 }
378
379 local_version = Some(version.fork(local_branch).await?);
380 }
381
382 if let Some(local_version) = local_version {
383 self.versions.insert(*local_branch.id(), local_version);
384 }
385
386 self.versions
389 .get_mut(local_branch.id())
390 .ok_or(Error::EntryNotFound)
391 }
392
393 fn entry_versions<'a>(&'a self, name: &'a str) -> impl Iterator<Item = EntryRef<'a>> {
394 self.versions
395 .values()
396 .filter_map(move |v| v.lookup(name).ok())
397 }
398}
399
400impl fmt::Debug for JointDirectory {
401 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
402 f.debug_struct("JointDirectory").finish()
403 }
404}
405
406#[derive(Debug)]
407pub enum JointEntryRef<'a> {
408 File(JointFileRef<'a>),
409 Directory(JointDirectoryRef<'a>),
410}
411
412impl<'a> JointEntryRef<'a> {
413 pub fn name(&self) -> &'a str {
414 match self {
415 Self::File(r) => r.name(),
416 Self::Directory(r) => r.name(),
417 }
418 }
419
420 pub fn unique_name(&self) -> Cow<'a, str> {
421 match self {
422 Self::File(r) => r.unique_name(),
423 Self::Directory(r) => r.unique_name(),
424 }
425 }
426
427 pub fn entry_type(&self) -> EntryType {
428 match self {
429 Self::File(_) => EntryType::File,
430 Self::Directory(_) => EntryType::Directory,
431 }
432 }
433
434 pub fn version_vector(&'a self) -> Cow<'a, VersionVector> {
435 match self {
436 Self::File(r) => Cow::Borrowed(r.version_vector()),
437 Self::Directory(r) => Cow::Owned(r.version_vector()),
438 }
439 }
440
441 pub fn file(self) -> Result<FileRef<'a>> {
442 match self {
443 Self::File(r) => Ok(r.file),
444 Self::Directory(_) => Err(Error::EntryIsDirectory),
445 }
446 }
447
448 pub fn directory(self) -> Result<JointDirectoryRef<'a>> {
449 match self {
450 Self::Directory(r) => Ok(r),
451 Self::File(_) => Err(Error::EntryIsFile),
452 }
453 }
454
455 fn first_branch(&self) -> &Branch {
456 match self {
457 Self::File(r) => r.branch(),
458 Self::Directory(r) => r.first_version().branch(),
459 }
460 }
461}
462
463#[derive(Debug)]
464pub struct JointFileRef<'a> {
465 file: FileRef<'a>,
466 needs_disambiguation: bool,
467}
468
469impl<'a> JointFileRef<'a> {
470 pub fn name(&self) -> &'a str {
471 self.file.name()
472 }
473
474 pub fn unique_name(&self) -> Cow<'a, str> {
475 if self.needs_disambiguation {
476 Cow::from(conflict::create_unique_name(
477 self.name(),
478 self.file.branch().id(),
479 ))
480 } else {
481 Cow::from(self.name())
482 }
483 }
484
485 pub async fn open(&self) -> Result<File> {
486 self.file.open().await
487 }
488
489 pub(crate) async fn fork(&self, dst_branch: &Branch) -> Result<()> {
490 self.file.fork(dst_branch).await
491 }
492
493 pub fn version_vector(&self) -> &'a VersionVector {
494 self.file.version_vector()
495 }
496
497 pub fn branch(&self) -> &Branch {
498 self.file.branch()
499 }
500
501 pub fn parent(&self) -> &Directory {
502 self.file.parent()
503 }
504
505 pub fn inner(&self) -> FileRef<'a> {
506 self.file
507 }
508}
509
510pub struct JointDirectoryRef<'a> {
511 versions: Vec<DirectoryRef<'a>>,
512 local_branch: Option<&'a Branch>,
513 needs_disambiguation: bool,
514}
515
516impl<'a> JointDirectoryRef<'a> {
517 fn new(
518 versions: Vec<DirectoryRef<'a>>,
519 local_branch: Option<&'a Branch>,
520 needs_disambiguation: bool,
521 ) -> Option<Self> {
522 if versions.is_empty() {
523 None
524 } else {
525 Some(Self {
526 versions,
527 local_branch,
528 needs_disambiguation,
529 })
530 }
531 }
532
533 pub fn name(&self) -> &'a str {
534 self.first_version().name()
535 }
536
537 pub fn unique_name(&self) -> Cow<'a, str> {
538 if self.needs_disambiguation {
539 Cow::from(conflict::create_unique_name(
540 self.name(),
541 self.first_version().branch().id(),
542 ))
543 } else {
544 Cow::from(self.name())
545 }
546 }
547
548 pub fn version_vector(&self) -> VersionVector {
549 self.versions
550 .iter()
551 .fold(VersionVector::new(), |mut vv, dir| {
552 vv.merge(dir.version_vector());
553 vv
554 })
555 }
556
557 pub async fn open(&self) -> Result<JointDirectory> {
558 self.open_with(MissingVersionStrategy::Skip, DirectoryFallback::Enabled)
559 .await
560 }
561
562 pub(crate) async fn open_with(
563 &self,
564 missing_version_strategy: MissingVersionStrategy,
565 fallback: DirectoryFallback,
566 ) -> Result<JointDirectory> {
567 let mut versions = Vec::new();
568 for version in &self.versions {
569 match version.open(fallback).await {
570 Ok(open_dir) => versions.push(open_dir),
571 Err(e)
572 if self
573 .local_branch
574 .map(|local_branch| version.branch().id() == local_branch.id())
575 .unwrap_or(false) =>
576 {
577 return Err(e)
578 }
579 Err(Error::Store(store::Error::BlockNotFound))
580 if matches!(missing_version_strategy, MissingVersionStrategy::Skip) =>
581 {
582 continue;
586 }
587 Err(e) => return Err(e),
588 }
589 }
590
591 Ok(JointDirectory::new(self.local_branch.cloned(), versions))
592 }
593
594 pub(crate) fn versions(&self) -> &[DirectoryRef<'_>] {
595 &self.versions
596 }
597
598 fn first_version(&self) -> &DirectoryRef<'a> {
599 self.versions
600 .first()
601 .expect("joint directory must contain at least one directory")
602 }
603}
604
605impl fmt::Debug for JointDirectoryRef<'_> {
606 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
607 f.debug_struct("JointDirectoryRef")
608 .field("name", &self.name())
609 .finish()
610 }
611}
612
613#[derive(Copy, Clone)]
615pub enum MissingVersionStrategy {
616 Skip,
618 Fail,
620}
621
622#[derive(Clone)]
627enum Merge<'a> {
628 Existing(Existing<'a>),
630 Tombstone(EntryTombstoneData),
632}
633
634#[derive(Default, Clone)]
635struct Existing<'a> {
636 files: VecDeque<FileRef<'a>>,
640 directories: Vec<DirectoryRef<'a>>,
641 needs_disambiguation: bool,
642 local_branch: Option<&'a Branch>,
643}
644
645impl<'a> Iterator for Existing<'a> {
646 type Item = JointEntryRef<'a>;
647
648 fn next(&mut self) -> Option<Self::Item> {
649 if let Some(dir) = JointDirectoryRef::new(
650 mem::take(&mut self.directories),
651 self.local_branch,
652 self.needs_disambiguation,
653 ) {
654 return Some(JointEntryRef::Directory(dir));
655 }
656
657 Some(JointEntryRef::File(JointFileRef {
658 file: self.files.pop_front()?,
659 needs_disambiguation: self.needs_disambiguation,
660 }))
661 }
662}
663
664impl<'a> Merge<'a> {
665 fn new<I>(entries: I, local_branch: Option<&'a Branch>) -> Self
668 where
669 I: Iterator<Item = EntryRef<'a>>,
670 {
671 let mut files = VecDeque::new();
672 let mut directories = vec![];
673 let mut tombstone: Option<EntryTombstoneData> = None;
674
675 let entries = versioned::keep_maximal(entries, PreferBranch(local_branch.map(Branch::id)));
677
678 for entry in entries {
679 match entry {
680 EntryRef::File(file) => files.push_back(file),
681 EntryRef::Directory(dir) => directories.push(dir),
682 EntryRef::Tombstone(_) if !files.is_empty() || !directories.is_empty() => continue,
683 EntryRef::Tombstone(new_tombstone) => {
684 let new_tombstone = if let Some(mut old_tombstone) = tombstone.take() {
685 old_tombstone.merge(new_tombstone.data());
686 old_tombstone
687 } else {
688 new_tombstone.data().clone()
689 };
690
691 tombstone = Some(new_tombstone);
692 }
693 }
694 }
695
696 let needs_disambiguation = files.len() + if directories.is_empty() { 0 } else { 1 } > 1;
697
698 match tombstone {
699 Some(tombstone) if files.is_empty() && directories.is_empty() => {
700 Self::Tombstone(tombstone)
701 }
702 Some(_) | None => Self::Existing(Existing {
703 files,
704 directories,
705 needs_disambiguation,
706 local_branch,
707 }),
708 }
709 }
710
711 fn ignore_tombstones(self) -> Existing<'a> {
712 match self {
713 Self::Existing(existing) => existing,
714 Self::Tombstone(_) => Existing::default(),
715 }
716 }
717}
718
719enum Pattern<'a> {
720 All,
722 Unique(&'a str),
724}
725
726impl<'a> Pattern<'a> {
727 fn apply(&self, dir: &'a JointDirectory) -> Result<impl Iterator<Item = JointEntryRef<'a>>> {
728 match self {
729 Self::All => Ok(Either::Left(dir.entries())),
730 Self::Unique(name) => dir
731 .lookup_unique(name)
732 .map(|entry| Either::Right(iter::once(entry))),
733 }
734 }
735}