1#[cfg(test)]
2mod tests;
3
4use crate::{
5 branch::Branch,
6 conflict,
7 crypto::sign::PublicKey,
8 directory::{
9 self, Directory, DirectoryFallback, DirectoryRef, EntryRef, EntryTombstoneData, EntryType,
10 FileRef,
11 },
12 error::{Error, Result},
13 file::File,
14 iterator::{Accumulate, SortedUnion},
15 store,
16 version_vector::VersionVector,
17 versioned::{self, PreferBranch},
18};
19use async_recursion::async_recursion;
20use camino::{Utf8Component, Utf8Path};
21use either::Either;
22use std::{
23 borrow::Cow,
24 collections::{BTreeMap, VecDeque},
25 fmt, iter, mem,
26};
27use tracing::{instrument, Instrument};
28
29#[derive(Clone)]
31pub struct JointDirectory {
32 versions: BTreeMap<PublicKey, Directory>,
33 local_branch: Option<Branch>,
34}
35
36impl JointDirectory {
37 pub fn new<I>(local_branch: Option<Branch>, versions: I) -> Self
41 where
42 I: IntoIterator<Item = Directory>,
43 {
44 let versions = versions
45 .into_iter()
46 .map(|dir| (*dir.branch().id(), dir))
47 .collect();
48
49 Self {
50 versions,
51 local_branch,
52 }
53 }
54
55 pub(crate) fn local_version(&self) -> Option<&Directory> {
56 self.local_branch
57 .as_ref()
58 .and_then(|branch| self.versions.get(branch.id()))
59 }
60
61 pub(crate) fn local_version_mut(&mut self) -> Option<&mut Directory> {
62 self.local_branch
63 .as_ref()
64 .and_then(|branch| self.versions.get_mut(branch.id()))
65 }
66
67 pub fn is_empty(&self) -> bool {
68 self.entries().next().is_none()
69 }
70
71 pub fn entries(&self) -> impl Iterator<Item = JointEntryRef> {
75 self.merge_entries()
76 .flat_map(|(_, merge)| merge.ignore_tombstones())
77 }
78
79 fn merge_entries(&self) -> impl Iterator<Item = (&str, Merge)> {
80 let entries = self.versions.values().map(|directory| directory.entries());
81 let entries = SortedUnion::new(entries, |entry| entry.name());
82 let entries = Accumulate::new(entries, |entry| entry.name());
83 entries.map(|(name, entries)| {
84 (
85 name,
86 Merge::new(entries.into_iter(), self.local_branch.as_ref()),
87 )
88 })
89 }
90
91 pub fn lookup<'a>(&'a self, name: &'a str) -> impl Iterator<Item = JointEntryRef<'a>> + 'a {
94 Merge::new(
95 self.versions
96 .values()
97 .filter_map(move |dir| dir.lookup(name).ok()),
98 self.local_branch.as_ref(),
99 )
100 .ignore_tombstones()
101 }
102
103 pub fn lookup_unique<'a>(&'a self, name: &'a str) -> Result<JointEntryRef<'a>> {
114 let mut entries =
116 Merge::new(self.entry_versions(name), self.local_branch.as_ref()).ignore_tombstones();
117 if let Some(entry) = entries.next() {
118 if entries.next().is_none() {
119 return Ok(entry);
120 } else {
121 return Err(Error::AmbiguousEntry);
122 }
123 }
124
125 let (name, branch_id_prefix) = conflict::parse_unique_name(name);
128 let branch_id_prefix = branch_id_prefix.ok_or(Error::EntryNotFound)?;
129
130 let mut entries = Merge::new(self.entry_versions(name), self.local_branch.as_ref())
131 .ignore_tombstones()
132 .filter(|entry| entry.first_branch().id().starts_with(&branch_id_prefix));
133
134 let first = entries.next().ok_or(Error::EntryNotFound)?;
135
136 if entries.next().is_none() {
137 Ok(first)
138 } else {
139 Err(Error::AmbiguousEntry)
140 }
141 }
142
143 #[instrument(skip(self), err(Debug))]
145 pub fn lookup_version(&self, name: &'_ str, branch_id: &'_ PublicKey) -> Result<FileRef> {
146 self.versions
147 .get(branch_id)
148 .ok_or(Error::EntryNotFound)
149 .and_then(|dir| dir.lookup(name))
150 .and_then(|entry| entry.file())
151 }
152
153 #[allow(clippy::len_without_is_empty)]
156 pub fn len(&self) -> u64 {
157 self.versions.values().map(|dir| dir.len()).sum()
158 }
159
160 pub fn has_local_version(&self) -> bool {
161 self.local_branch
162 .as_ref()
163 .map(|local_branch| self.versions.contains_key(local_branch.id()))
164 .unwrap_or(false)
165 }
166
167 pub async fn cd(&self, path: impl AsRef<Utf8Path>) -> Result<Self> {
171 let mut curr = Cow::Borrowed(self);
172
173 for component in path.as_ref().components() {
174 match component {
175 Utf8Component::RootDir | Utf8Component::CurDir => (),
176 Utf8Component::Normal(name) => {
177 let next = curr
178 .lookup(name)
179 .find_map(|entry| entry.directory().ok())
180 .ok_or(Error::EntryNotFound)?
181 .open()
182 .await?;
183 curr = Cow::Owned(next);
184 }
185 Utf8Component::ParentDir | Utf8Component::Prefix(_) => {
186 return Err(Error::OperationNotSupported)
187 }
188 }
189 }
190
191 Ok(curr.into_owned())
192 }
193
194 pub async fn remove_entry(&mut self, name: &str) -> Result<()> {
197 self.remove_entries(Pattern::Unique(name)).await
198 }
199
200 pub async fn remove_entry_recursively(&mut self, name: &str) -> Result<()> {
203 self.remove_entries_recursively(Pattern::Unique(name)).await
204 }
205
206 async fn remove_entries(&mut self, pattern: Pattern<'_>) -> Result<()> {
207 let local_branch = self.local_branch.as_ref().ok_or(Error::PermissionDenied)?;
208
209 let entries: Vec<_> = pattern
210 .apply(self)?
211 .map(|entry| {
212 let name = entry.name().to_owned();
213 let branch_id = match &entry {
214 JointEntryRef::File(entry) => *entry.branch().id(),
215 JointEntryRef::Directory(_) => *local_branch.id(),
216 };
217 let vv = entry.version_vector().into_owned();
218
219 (name, branch_id, vv)
220 })
221 .collect();
222
223 let local_version = self.fork().await?;
224
225 for (name, branch_id, vv) in entries {
226 local_version.remove_entry(&name, &branch_id, vv).await?;
227 }
228
229 Ok(())
230 }
231
232 #[async_recursion]
233 async fn remove_entries_recursively<'a>(&'a mut self, pattern: Pattern<'a>) -> Result<()> {
234 for entry in pattern.apply(self)?.filter_map(|e| e.directory().ok()) {
235 let mut dir = entry
236 .open_with(MissingVersionStrategy::Skip, DirectoryFallback::Disabled)
237 .await?;
238 dir.remove_entries_recursively(Pattern::All).await?;
239 }
240
241 if let Some(local_version) = self.local_version_mut() {
242 local_version.refresh().await?;
243 }
244
245 self.remove_entries(pattern).await
246 }
247
248 #[async_recursion]
254 pub async fn merge(&mut self) -> Result<Directory> {
255 let old_version_vector = if let Some(local_version) = self.local_version() {
256 local_version.version_vector().await?
257 } else {
258 VersionVector::new()
259 };
260
261 let new_version_vector = self.merge_version_vectors().await?;
262
263 if !old_version_vector.is_empty() && old_version_vector >= new_version_vector {
264 tracing::trace!(old = ?old_version_vector, "Merge not started - already up to date");
266 return Ok(self.local_version().unwrap().clone());
269 } else {
270 tracing::trace!(old = ?old_version_vector, new = ?new_version_vector, "Merge started");
271 }
272
273 let local_version = self.fork().await?;
274 let local_branch = local_version.branch().clone();
275
276 let mut conflict = false;
277 let mut check_for_removal = Vec::new();
278
279 for (name, merge) in self.merge_entries() {
280 match merge {
281 Merge::Existing(existing) => {
282 for entry in existing {
283 match entry {
284 JointEntryRef::File(entry) => {
285 match entry.fork(&local_branch).await {
286 Ok(()) => {}
287 Err(Error::EntryExists) => {
288 conflict = true;
293 }
294 Err(error) => return Err(error),
295 }
296 }
297 JointEntryRef::Directory(entry) => {
298 let mut dir = entry
299 .open_with(
300 MissingVersionStrategy::Fail,
301 DirectoryFallback::Disabled,
302 )
303 .await?;
304 match dir
305 .merge()
306 .instrument(tracing::info_span!("dir", message = name))
307 .await
308 {
309 Ok(_) => (),
310 Err(Error::AmbiguousEntry) => {
311 conflict = true;
312 }
313 Err(error) => return Err(error),
314 }
315 }
316 }
317 }
318 }
319 Merge::Tombstone(tombstone) => {
320 check_for_removal.push((name.to_owned(), tombstone));
321 }
322 }
323 }
324
325 let local_version = self.local_version_mut().unwrap();
328 local_version.refresh().await?;
329
330 for (name, tombstone) in check_for_removal {
331 local_version.create_tombstone(&name, tombstone).await?;
332 }
333
334 if !conflict && local_version.is_root() {
337 directory::bump_root(&local_branch, new_version_vector).await?;
338 }
339
340 if tracing::enabled!(tracing::Level::TRACE) {
341 let vv = local_version.version_vector().await?;
342 tracing::trace!(?vv, ?conflict, "Merge completed");
343 }
344
345 if conflict {
346 Err(Error::AmbiguousEntry)
347 } else {
348 Ok(local_version.clone())
349 }
350 }
351
352 async fn merge_version_vectors(&self) -> Result<VersionVector> {
354 let mut outcome = VersionVector::new();
355
356 for version in self.versions.values() {
357 outcome.merge(&version.version_vector().await?);
358 }
359
360 Ok(outcome)
361 }
362
363 async fn fork(&mut self) -> Result<&mut Directory> {
364 let local_branch = self.local_branch.as_ref().ok_or(Error::PermissionDenied)?;
365
366 let mut local_version = None;
367
368 for (branch_id, version) in &self.versions {
371 if branch_id == local_branch.id() {
372 continue;
373 }
374
375 local_version = Some(version.fork(local_branch).await?);
376 }
377
378 if let Some(local_version) = local_version {
379 self.versions.insert(*local_branch.id(), local_version);
380 }
381
382 self.versions
385 .get_mut(local_branch.id())
386 .ok_or(Error::EntryNotFound)
387 }
388
389 fn entry_versions<'a>(&'a self, name: &'a str) -> impl Iterator<Item = EntryRef<'a>> {
390 self.versions
391 .values()
392 .filter_map(move |v| v.lookup(name).ok())
393 }
394}
395
396impl fmt::Debug for JointDirectory {
397 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
398 f.debug_struct("JointDirectory").finish()
399 }
400}
401
402#[derive(Debug)]
403pub enum JointEntryRef<'a> {
404 File(JointFileRef<'a>),
405 Directory(JointDirectoryRef<'a>),
406}
407
408impl<'a> JointEntryRef<'a> {
409 pub fn name(&self) -> &'a str {
410 match self {
411 Self::File(r) => r.name(),
412 Self::Directory(r) => r.name(),
413 }
414 }
415
416 pub fn unique_name(&self) -> Cow<'a, str> {
417 match self {
418 Self::File(r) => r.unique_name(),
419 Self::Directory(r) => r.unique_name(),
420 }
421 }
422
423 pub fn entry_type(&self) -> EntryType {
424 match self {
425 Self::File(_) => EntryType::File,
426 Self::Directory(_) => EntryType::Directory,
427 }
428 }
429
430 pub fn version_vector(&'a self) -> Cow<'a, VersionVector> {
431 match self {
432 Self::File(r) => Cow::Borrowed(r.version_vector()),
433 Self::Directory(r) => Cow::Owned(r.version_vector()),
434 }
435 }
436
437 pub fn file(self) -> Result<FileRef<'a>> {
438 match self {
439 Self::File(r) => Ok(r.file),
440 Self::Directory(_) => Err(Error::EntryIsDirectory),
441 }
442 }
443
444 pub fn directory(self) -> Result<JointDirectoryRef<'a>> {
445 match self {
446 Self::Directory(r) => Ok(r),
447 Self::File(_) => Err(Error::EntryIsFile),
448 }
449 }
450
451 fn first_branch(&self) -> &Branch {
452 match self {
453 Self::File(r) => r.branch(),
454 Self::Directory(r) => r.first_version().branch(),
455 }
456 }
457}
458
459#[derive(Debug)]
460pub struct JointFileRef<'a> {
461 file: FileRef<'a>,
462 needs_disambiguation: bool,
463}
464
465impl<'a> JointFileRef<'a> {
466 pub fn name(&self) -> &'a str {
467 self.file.name()
468 }
469
470 pub fn unique_name(&self) -> Cow<'a, str> {
471 if self.needs_disambiguation {
472 Cow::from(conflict::create_unique_name(
473 self.name(),
474 self.file.branch().id(),
475 ))
476 } else {
477 Cow::from(self.name())
478 }
479 }
480
481 pub async fn open(&self) -> Result<File> {
482 self.file.open().await
483 }
484
485 pub(crate) async fn fork(&self, dst_branch: &Branch) -> Result<()> {
486 self.file.fork(dst_branch).await
487 }
488
489 pub fn version_vector(&self) -> &'a VersionVector {
490 self.file.version_vector()
491 }
492
493 pub fn branch(&self) -> &Branch {
494 self.file.branch()
495 }
496
497 pub fn parent(&self) -> &Directory {
498 self.file.parent()
499 }
500
501 pub fn inner(&self) -> FileRef<'a> {
502 self.file
503 }
504}
505
506pub struct JointDirectoryRef<'a> {
507 versions: Vec<DirectoryRef<'a>>,
508 local_branch: Option<&'a Branch>,
509 needs_disambiguation: bool,
510}
511
512impl<'a> JointDirectoryRef<'a> {
513 fn new(
514 versions: Vec<DirectoryRef<'a>>,
515 local_branch: Option<&'a Branch>,
516 needs_disambiguation: bool,
517 ) -> Option<Self> {
518 if versions.is_empty() {
519 None
520 } else {
521 Some(Self {
522 versions,
523 local_branch,
524 needs_disambiguation,
525 })
526 }
527 }
528
529 pub fn name(&self) -> &'a str {
530 self.first_version().name()
531 }
532
533 pub fn unique_name(&self) -> Cow<'a, str> {
534 if self.needs_disambiguation {
535 Cow::from(conflict::create_unique_name(
536 self.name(),
537 self.first_version().branch().id(),
538 ))
539 } else {
540 Cow::from(self.name())
541 }
542 }
543
544 pub fn version_vector(&self) -> VersionVector {
545 self.versions
546 .iter()
547 .fold(VersionVector::new(), |mut vv, dir| {
548 vv.merge(dir.version_vector());
549 vv
550 })
551 }
552
553 pub async fn open(&self) -> Result<JointDirectory> {
554 self.open_with(MissingVersionStrategy::Skip, DirectoryFallback::Enabled)
555 .await
556 }
557
558 pub(crate) async fn open_with(
559 &self,
560 missing_version_strategy: MissingVersionStrategy,
561 fallback: DirectoryFallback,
562 ) -> Result<JointDirectory> {
563 let mut versions = Vec::new();
564 for version in &self.versions {
565 match version.open(fallback).await {
566 Ok(open_dir) => versions.push(open_dir),
567 Err(e)
568 if self
569 .local_branch
570 .map(|local_branch| version.branch().id() == local_branch.id())
571 .unwrap_or(false) =>
572 {
573 return Err(e)
574 }
575 Err(Error::Store(store::Error::BlockNotFound))
576 if matches!(missing_version_strategy, MissingVersionStrategy::Skip) =>
577 {
578 continue;
582 }
583 Err(e) => return Err(e),
584 }
585 }
586
587 Ok(JointDirectory::new(self.local_branch.cloned(), versions))
588 }
589
590 pub(crate) fn versions(&self) -> &[DirectoryRef] {
591 &self.versions
592 }
593
594 fn first_version(&self) -> &DirectoryRef<'a> {
595 self.versions
596 .first()
597 .expect("joint directory must contain at least one directory")
598 }
599}
600
601impl fmt::Debug for JointDirectoryRef<'_> {
602 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
603 f.debug_struct("JointDirectoryRef")
604 .field("name", &self.name())
605 .finish()
606 }
607}
608
609#[derive(Copy, Clone)]
611pub enum MissingVersionStrategy {
612 Skip,
614 Fail,
616}
617
618#[derive(Clone)]
623enum Merge<'a> {
624 Existing(Existing<'a>),
626 Tombstone(EntryTombstoneData),
628}
629
630#[derive(Default, Clone)]
631struct Existing<'a> {
632 files: VecDeque<FileRef<'a>>,
636 directories: Vec<DirectoryRef<'a>>,
637 needs_disambiguation: bool,
638 local_branch: Option<&'a Branch>,
639}
640
641impl<'a> Iterator for Existing<'a> {
642 type Item = JointEntryRef<'a>;
643
644 fn next(&mut self) -> Option<Self::Item> {
645 if let Some(dir) = JointDirectoryRef::new(
646 mem::take(&mut self.directories),
647 self.local_branch,
648 self.needs_disambiguation,
649 ) {
650 return Some(JointEntryRef::Directory(dir));
651 }
652
653 Some(JointEntryRef::File(JointFileRef {
654 file: self.files.pop_front()?,
655 needs_disambiguation: self.needs_disambiguation,
656 }))
657 }
658}
659
660impl<'a> Merge<'a> {
661 fn new<I>(entries: I, local_branch: Option<&'a Branch>) -> Self
664 where
665 I: Iterator<Item = EntryRef<'a>>,
666 {
667 let mut files = VecDeque::new();
668 let mut directories = vec![];
669 let mut tombstone: Option<EntryTombstoneData> = None;
670
671 let entries = versioned::keep_maximal(entries, PreferBranch(local_branch.map(Branch::id)));
673
674 for entry in entries {
675 match entry {
676 EntryRef::File(file) => files.push_back(file),
677 EntryRef::Directory(dir) => directories.push(dir),
678 EntryRef::Tombstone(_) if !files.is_empty() || !directories.is_empty() => continue,
679 EntryRef::Tombstone(new_tombstone) => {
680 let new_tombstone = if let Some(mut old_tombstone) = tombstone.take() {
681 old_tombstone.merge(new_tombstone.data());
682 old_tombstone
683 } else {
684 new_tombstone.data().clone()
685 };
686
687 tombstone = Some(new_tombstone);
688 }
689 }
690 }
691
692 let needs_disambiguation = files.len() + if directories.is_empty() { 0 } else { 1 } > 1;
693
694 match tombstone {
695 Some(tombstone) if files.is_empty() && directories.is_empty() => {
696 Self::Tombstone(tombstone)
697 }
698 Some(_) | None => Self::Existing(Existing {
699 files,
700 directories,
701 needs_disambiguation,
702 local_branch,
703 }),
704 }
705 }
706
707 fn ignore_tombstones(self) -> Existing<'a> {
708 match self {
709 Self::Existing(existing) => existing,
710 Self::Tombstone(_) => Existing::default(),
711 }
712 }
713}
714
715enum Pattern<'a> {
716 All,
718 Unique(&'a str),
720}
721
722impl<'a> Pattern<'a> {
723 fn apply(&self, dir: &'a JointDirectory) -> Result<impl Iterator<Item = JointEntryRef<'a>>> {
724 match self {
725 Self::All => Ok(Either::Left(dir.entries())),
726 Self::Unique(name) => dir
727 .lookup_unique(name)
728 .map(|entry| Either::Right(iter::once(entry))),
729 }
730 }
731}