ouisync/repository/
mod.rs

1pub(crate) mod monitor;
2
3mod credentials;
4mod metadata;
5mod params;
6mod vault;
7mod worker;
8
9#[cfg(test)]
10mod tests;
11
12pub use self::{credentials::Credentials, metadata::Metadata, params::RepositoryParams};
13
14pub(crate) use self::{
15    metadata::{data_version, quota},
16    vault::Vault,
17};
18
19use self::monitor::RepositoryMonitor;
20use crate::{
21    access_control::{Access, AccessChange, AccessKeys, AccessMode, AccessSecrets, LocalSecret},
22    block_tracker::BlockRequestMode,
23    branch::{Branch, BranchShared},
24    crypto::{PasswordSalt, sign::PublicKey},
25    db::{self, DatabaseId},
26    debug::DebugPrinter,
27    directory::{Directory, DirectoryFallback, DirectoryLocking, EntryRef, EntryType},
28    error::{Error, Result},
29    event::{Event, EventSender},
30    file::File,
31    joint_directory::{JointDirectory, JointEntryRef, MissingVersionStrategy},
32    path,
33    progress::Progress,
34    protocol::{BLOCK_SIZE, RootNodeFilter, StorageSize},
35    store,
36    sync::stream::Throttle,
37    version_vector::VersionVector,
38};
39use camino::Utf8Path;
40use deadlock::{BlockingMutex, BlockingRwLock};
41use futures_util::{StreamExt, stream};
42use futures_util::{TryStreamExt, future};
43use metrics::{NoopRecorder, Recorder};
44use scoped_task::ScopedJoinHandle;
45use state_monitor::StateMonitor;
46use std::{
47    borrow::Cow,
48    path::{Path, PathBuf},
49    pin::pin,
50    sync::Arc,
51    time::SystemTime,
52};
53use tokio::{
54    sync::broadcast::{self, error::RecvError},
55    time::Duration,
56};
57use tracing::instrument::Instrument;
58
59const EVENT_CHANNEL_CAPACITY: usize = 10000;
60
61pub struct Repository {
62    shared: Arc<Shared>,
63    worker_handle: BlockingMutex<Option<ScopedJoinHandle<()>>>,
64    progress_reporter_handle: BlockingMutex<Option<ScopedJoinHandle<()>>>,
65}
66
67/// List of repository database files. Includes the main db file and any auxiliary files.
68///
69/// The aux files don't always exists but when they do and one wants to rename, move or delete the
70/// repository, they should rename/move/delete these files as well.
71///
72/// Note ideally the aux files should be deleted automatically when the repo has been closed. But
73/// because of a [bug][1] in sqlx, they are sometimes not and need to be handled
74/// (move,deleted) manually. When the bug is fixed, this function can be removed.
75///
76/// [1]: https://github.com/launchbadge/sqlx/issues/3217
77pub fn database_files(store_path: impl AsRef<Path>) -> Vec<PathBuf> {
78    // Sqlite database consists of up to three files: main db (always present), WAL and WAL-index.
79    ["", "-wal", "-shm"]
80        .into_iter()
81        .map(|suffix| {
82            let mut path = store_path.as_ref().as_os_str().to_owned();
83            path.push(suffix);
84            path.into()
85        })
86        .collect()
87}
88
89impl Repository {
90    /// Creates a new repository.
91    pub async fn create(params: &RepositoryParams<impl Recorder>, access: Access) -> Result<Self> {
92        let pool = params.create().await?;
93        let device_id = params.device_id();
94        let monitor = params.monitor();
95
96        let mut tx = pool.begin_write().await?;
97
98        let local_keys = metadata::initialize_access_secrets(&mut tx, &access).await?;
99        let writer_id =
100            metadata::get_or_generate_writer_id(&mut tx, local_keys.write.as_deref()).await?;
101        metadata::set_device_id(&mut tx, &device_id).await?;
102
103        tx.commit().await?;
104
105        let credentials = Credentials {
106            secrets: access.secrets(),
107            writer_id,
108        };
109
110        Self::new(pool, credentials, monitor).init().await
111    }
112
113    /// Opens an existing repository.
114    pub async fn open(
115        params: &RepositoryParams<impl Recorder>,
116        local_secret: Option<LocalSecret>,
117        access_mode: AccessMode,
118    ) -> Result<Self> {
119        let pool = params.open().await?;
120        let monitor = params.monitor();
121        let device_id = params.device_id();
122
123        let mut tx = pool.begin_write().await?;
124
125        let (secrets, local_key) =
126            metadata::get_access_secrets(&mut tx, local_secret.as_ref()).await?;
127
128        let secrets = secrets.with_mode(access_mode);
129
130        let writer_id = if metadata::check_device_id(&mut tx, &device_id).await? {
131            if secrets.can_write() {
132                metadata::get_or_generate_writer_id(&mut tx, local_key.as_deref()).await?
133            } else {
134                metadata::generate_writer_id()
135            }
136        } else {
137            // Device id changed, likely because the repo database has been transferred to a
138            // different device. We need to generate a new writer id.
139            //
140            // Note we need to do this even when not currently opening the repo in write mode. This
141            // is so that when the access mode is subsequently switched to write
142            // (with [set_access_mode]) we don't end up using the wrong writer_id.
143            let writer_id = metadata::generate_writer_id();
144
145            metadata::set_device_id(&mut tx, &device_id).await?;
146            metadata::set_writer_id(&mut tx, &writer_id, local_key.as_deref()).await?;
147
148            writer_id
149        };
150
151        tx.commit().await?;
152
153        let credentials = Credentials { secrets, writer_id };
154
155        Self::new(pool, credentials, monitor).init().await
156    }
157
158    fn new(pool: db::Pool, credentials: Credentials, monitor: RepositoryMonitor) -> Self {
159        Self {
160            shared: Arc::new(Shared::new(pool, credentials, monitor)),
161            worker_handle: BlockingMutex::new(None),
162            progress_reporter_handle: BlockingMutex::new(None),
163        }
164    }
165
166    async fn init(self) -> Result<Self> {
167        let credentials = self.credentials();
168
169        if let Some(keys) = credentials
170            .secrets
171            .write_secrets()
172            .map(|secrets| &secrets.write_keys)
173        {
174            self.shared
175                .vault
176                .store()
177                .migrate_data(credentials.writer_id, keys)
178                .await?;
179        }
180
181        {
182            let mut conn = self.shared.vault.store().db().acquire().await?;
183            if let Some(block_expiration) = metadata::block_expiration::get(&mut conn).await? {
184                self.shared
185                    .vault
186                    .set_block_expiration(Some(block_expiration))?;
187            }
188        }
189
190        tracing::debug!(
191            parent: self.shared.vault.monitor.span(),
192            access = ?credentials.secrets.access_mode(),
193            writer_id = ?credentials.writer_id,
194            "Repository opened"
195        );
196
197        *self.worker_handle.lock().unwrap() = Some(spawn_worker(self.shared.clone()));
198
199        *self.progress_reporter_handle.lock().unwrap() = Some(scoped_task::spawn(
200            report_sync_progress(self.shared.vault.clone())
201                .instrument(self.shared.vault.monitor.span().clone()),
202        ));
203
204        Ok(self)
205    }
206
207    pub async fn database_id(&self) -> Result<DatabaseId> {
208        Ok(metadata::get_or_generate_database_id(self.db()).await?)
209    }
210
211    pub async fn requires_local_secret_for_reading(&self) -> Result<bool> {
212        let mut conn = self.db().acquire().await?;
213        Ok(metadata::requires_local_secret_for_reading(&mut conn).await?)
214    }
215
216    pub async fn requires_local_secret_for_writing(&self) -> Result<bool> {
217        let mut conn = self.db().acquire().await?;
218        Ok(metadata::requires_local_secret_for_writing(&mut conn).await?)
219    }
220
221    /// Sets, unsets or changes local secrets for accessing the repository or disables the given
222    /// access mode.
223    ///
224    /// In order to enable or change a given access mode the repository must currently be in at
225    /// least that access mode. Disabling an access mode doesn't have this restriction.
226    ///
227    /// Disabling a given access mode makes the repo impossible to be opened in that mode anymore.
228    /// However, when the repo is currently in that mode it still remains in it until the repo is
229    /// closed.
230    ///
231    /// To restore a disabled mode the repo must first be put into that mode using
232    /// [Self::set_credentials()] where the `Credentials` must be obtained from `AccessSecrets` with
233    /// at least the mode one wants to restore.
234    ///
235    /// If `read` or `write` is `None` then no change is made to that mode. If both are `None` then
236    /// this function is a no-op.
237    ///
238    /// Disabling the read mode while keeping write mode enabled is allowed but not very useful as
239    /// write mode also grants read access.
240    pub async fn set_access(
241        &self,
242        read_change: Option<AccessChange>,
243        write_change: Option<AccessChange>,
244    ) -> Result<()> {
245        let mut tx = self.db().begin_write().await?;
246
247        if let Some(change) = read_change {
248            self.set_read_access(&mut tx, change).await?;
249        }
250
251        if let Some(change) = write_change {
252            self.set_write_access(&mut tx, change).await?;
253        }
254
255        tx.commit().await?;
256
257        Ok(())
258    }
259
260    async fn set_read_access(
261        &self,
262        tx: &mut db::WriteTransaction,
263        change: AccessChange,
264    ) -> Result<()> {
265        let local = match &change {
266            AccessChange::Enable(Some(local_secret)) => {
267                Some(metadata::secret_to_key_and_salt(local_secret))
268            }
269            AccessChange::Enable(None) => None,
270            AccessChange::Disable => {
271                metadata::remove_read_key(tx).await?;
272                return Ok(());
273            }
274        };
275
276        let (id, read_key) = {
277            let cred = self.shared.credentials.read().unwrap();
278            (
279                *cred.secrets.id(),
280                cred.secrets
281                    .read_key()
282                    .ok_or(Error::PermissionDenied)?
283                    .clone(),
284            )
285        };
286
287        metadata::set_read_key(
288            tx,
289            &id,
290            &read_key,
291            local.as_ref().map(|(k, s)| (k.as_ref(), s.as_ref())),
292        )
293        .await?;
294
295        Ok(())
296    }
297
298    async fn set_write_access(
299        &self,
300        tx: &mut db::WriteTransaction,
301        change: AccessChange,
302    ) -> Result<()> {
303        let local = match &change {
304            AccessChange::Enable(Some(local_secret)) => {
305                Some(metadata::secret_to_key_and_salt(local_secret))
306            }
307            AccessChange::Enable(None) => None,
308            AccessChange::Disable => {
309                metadata::remove_write_key(tx).await?;
310                return Ok(());
311            }
312        };
313
314        let (write_secrets, writer_id) = {
315            let cred = self.shared.credentials.read().unwrap();
316            (
317                cred.secrets
318                    .write_secrets()
319                    .ok_or(Error::PermissionDenied)?
320                    .clone(),
321                cred.writer_id,
322            )
323        };
324
325        metadata::set_write_key(
326            tx,
327            &write_secrets,
328            local.as_ref().map(|(k, s)| (k.as_ref(), s.as_ref())),
329        )
330        .await?;
331        metadata::set_writer_id(tx, &writer_id, local.as_ref().map(|(k, _)| k.as_ref())).await?;
332
333        Ok(())
334    }
335
336    /// Gets the current credentials of this repository.
337    ///
338    /// See also [Self::set_credentials()].
339    pub fn credentials(&self) -> Credentials {
340        self.shared.credentials.read().unwrap().clone()
341    }
342
343    pub fn secrets(&self) -> AccessSecrets {
344        self.shared.credentials.read().unwrap().secrets.clone()
345    }
346
347    /// Gets the current access mode of this repository.
348    pub fn access_mode(&self) -> AccessMode {
349        self.shared
350            .credentials
351            .read()
352            .unwrap()
353            .secrets
354            .access_mode()
355    }
356
357    /// Switches the repository to the given mode.
358    ///
359    /// The actual mode the repository gets switched to is the higher of the current access mode
360    /// and the mode provided by `local_key` but at most the mode specified in `access_mode`
361    /// ("higher" means according to `AccessMode`'s `Ord` impl, that is: Write > Read > Blind).
362    pub async fn set_access_mode(
363        &self,
364        access_mode: AccessMode,
365        local_secret: Option<LocalSecret>,
366    ) -> Result<()> {
367        // Try to use the current secrets but fall back to the secrets stored in the metadata if the
368        // current ones are insufficient and the stored ones have higher access mode.
369        let old_secrets = {
370            let creds = self.shared.credentials.read().unwrap();
371
372            if creds.secrets.access_mode() == access_mode {
373                return Ok(());
374            }
375
376            creds.secrets.clone()
377        };
378
379        let mut tx = self.db().begin_write().await?;
380
381        let (secrets, local_key) = if old_secrets.access_mode() >= access_mode {
382            (old_secrets, None)
383        } else {
384            let (new_secrets, local_key) =
385                metadata::get_access_secrets(&mut tx, local_secret.as_ref()).await?;
386
387            if new_secrets.access_mode() > old_secrets.access_mode() {
388                (new_secrets, local_key)
389            } else {
390                (old_secrets, None)
391            }
392        };
393
394        let secrets = secrets.with_mode(access_mode);
395
396        let writer_id = if secrets.can_write() {
397            metadata::get_or_generate_writer_id(&mut tx, local_key.as_deref()).await?
398        } else {
399            metadata::generate_writer_id()
400        };
401
402        tx.commit().await?;
403
404        // Data migration can only be applied in write mode so apply any pending ones if we just
405        // switched to write mode.
406        if let Some(write_keys) = secrets.write_secrets().map(|secrets| &secrets.write_keys) {
407            self.shared
408                .vault
409                .store()
410                .migrate_data(writer_id, write_keys)
411                .await?;
412        }
413
414        self.update_credentials(Credentials { secrets, writer_id });
415
416        Ok(())
417    }
418
419    /// Overrides the current credentials of this repository.
420    ///
421    /// This is useful for moving/renaming the repo database or to restore access which has been
422    /// either disabled or it's local secret lost.
423    ///
424    /// # Move/rename the repo db
425    ///
426    /// 1. Obtain the current credentials with [Self::credentials()] and keep them locally.
427    /// 2. Close the repo.
428    /// 3. Rename the repo database files(s).
429    /// 4. Open the repo from its new location in blind mode.
430    /// 5. Restore the credentials from step 1 with [Self::set_credentials()].
431    ///
432    /// # Restore access
433    ///
434    /// 1. Get the `AccessSecrets` the repository was originally created from (e.g., by extracting
435    ///    them from the original `ShareToken`).
436    /// 2. Construct `Credentials` using this access secrets and a random writer id.
437    /// 3. Restore the credentials with [Self::set_credentials()].
438    /// 4. Enable/change the access with [Self::set_access()].
439    pub async fn set_credentials(&self, credentials: Credentials) -> Result<()> {
440        // Check the credentials are actually for this repository
441        let expected_id = {
442            let mut conn = self.db().acquire().await?;
443            metadata::get_repository_id(&mut conn).await?
444        };
445
446        if credentials.secrets.id() != &expected_id {
447            return Err(Error::PermissionDenied);
448        }
449
450        if let Some(write_secrets) = credentials.secrets.write_secrets() {
451            self.shared
452                .vault
453                .store()
454                .migrate_data(credentials.writer_id, &write_secrets.write_keys)
455                .await?;
456        }
457
458        self.update_credentials(credentials);
459
460        Ok(())
461    }
462
463    pub async fn unlock_secrets(&self, local_secret: LocalSecret) -> Result<AccessSecrets> {
464        let mut tx = self.db().begin_write().await?;
465        Ok(metadata::get_access_secrets(&mut tx, Some(&local_secret))
466            .await?
467            .0)
468    }
469
470    /// Get accessor for repository metadata. The metadata are arbitrary key-value entries that are
471    /// stored inside the repository but not synced to other replicas.
472    pub fn metadata(&self) -> Metadata {
473        self.shared.vault.metadata()
474    }
475
476    /// Set the storage quota in bytes. Use `None` to disable quota. Default is `None`.
477    pub async fn set_quota(&self, quota: Option<StorageSize>) -> Result<()> {
478        self.shared.vault.set_quota(quota).await
479    }
480
481    /// Get the storage quota in bytes or `None` if no quota is set.
482    pub async fn quota(&self) -> Result<Option<StorageSize>> {
483        self.shared.vault.quota().await
484    }
485
486    /// Set the duration after which blocks start to expire (are deleted) when not used. Use `None`
487    /// to disable expiration. Default is `None`.
488    pub async fn set_block_expiration(&self, block_expiration: Option<Duration>) -> Result<()> {
489        let mut tx = self.db().begin_write().await?;
490        metadata::block_expiration::set(&mut tx, block_expiration).await?;
491        tx.commit().await?;
492
493        self.shared.vault.set_block_expiration(block_expiration)
494    }
495
496    /// Get the block expiration duration. `None` means block expiration is not set.
497    pub fn block_expiration(&self) -> Option<Duration> {
498        self.shared.vault.block_expiration()
499    }
500
501    /// Get the time when the last block expired or `None` if there are still some unexpired blocks.
502    /// If block expiration is not enabled, always return `None`.
503    pub fn last_block_expiration_time(&self) -> Option<SystemTime> {
504        self.shared.vault.last_block_expiration_time()
505    }
506
507    /// Get the total size of the data stored in this repository.
508    pub async fn size(&self) -> Result<StorageSize> {
509        self.shared.vault.size().await
510    }
511
512    pub fn handle(&self) -> RepositoryHandle {
513        RepositoryHandle {
514            vault: self.shared.vault.clone(),
515        }
516    }
517
518    pub async fn get_read_password_salt(&self) -> Result<PasswordSalt> {
519        let mut tx = self.db().begin_write().await?;
520        Ok(metadata::get_password_salt(&mut tx, metadata::KeyType::Read).await?)
521    }
522
523    pub async fn get_write_password_salt(&self) -> Result<PasswordSalt> {
524        let mut tx = self.db().begin_write().await?;
525        Ok(metadata::get_password_salt(&mut tx, metadata::KeyType::Write).await?)
526    }
527
528    /// Get the state monitor node of this repository.
529    pub fn monitor(&self) -> &StateMonitor {
530        self.shared.vault.monitor.node()
531    }
532
533    /// Export the repository to the given file.
534    ///
535    /// The repository is currently exported as read-only with no password. In the future other
536    /// modes might be added.
537    pub async fn export(&self, dst: &Path) -> Result<()> {
538        /// RAII to delete the exported repo in case the process fails or is interupted to avoid
539        /// exporting the repo with a wrong access mode.
540        struct Cleanup<'a> {
541            path: &'a Path,
542            armed: bool,
543        }
544
545        impl Drop for Cleanup<'_> {
546            fn drop(&mut self) {
547                if !self.armed {
548                    return;
549                }
550
551                if let Err(error) = std::fs::remove_file(self.path) {
552                    tracing::error!(
553                        path = ?self.path,
554                        ?error,
555                        "failed to delete partially exported repository",
556                    );
557                }
558            }
559        }
560
561        let mut cleanup = Cleanup {
562            path: dst,
563            armed: true,
564        };
565
566        // Export the repo to `dst`
567        self.shared.vault.store().export(dst).await?;
568
569        // Open it and strip write access and read password (if any).
570        let pool = db::open(dst).await?;
571        let credentials = self.credentials().with_mode(AccessMode::Read);
572        let access_mode = credentials.secrets.access_mode();
573        let monitor = RepositoryMonitor::new(StateMonitor::make_root(), &NoopRecorder);
574        let repo = Self::new(pool, credentials, monitor);
575
576        match access_mode {
577            AccessMode::Blind => {
578                repo.set_access(Some(AccessChange::Disable), Some(AccessChange::Disable))
579                    .await?
580            }
581            AccessMode::Read => {
582                repo.set_access(
583                    Some(AccessChange::Enable(None)),
584                    Some(AccessChange::Disable),
585                )
586                .await?
587            }
588            AccessMode::Write => unreachable!(),
589        }
590
591        repo.close().await?;
592
593        cleanup.armed = false;
594
595        Ok(())
596    }
597
598    /// Looks up an entry by its path. The path must be relative to the repository root.
599    /// If the entry exists, returns its `EntryType`, otherwise returns `EntryNotFound`.
600    pub async fn lookup_type<P: AsRef<Utf8Path>>(&self, path: P) -> Result<EntryType> {
601        match path::decompose(path.as_ref()) {
602            Some((parent, name)) => {
603                let parent = self.open_directory(parent).await?;
604                Ok(parent.lookup_unique(name)?.entry_type())
605            }
606            None => Ok(EntryType::Directory),
607        }
608    }
609
610    /// Opens a file at the given path (relative to the repository root)
611    pub async fn open_file<P: AsRef<Utf8Path>>(&self, path: P) -> Result<File> {
612        let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::EntryIsDirectory)?;
613
614        self.cd(parent)
615            .await?
616            .lookup_unique(name)?
617            .file()?
618            .open()
619            .await
620    }
621
622    /// Open a specific version of the file at the given path.
623    pub async fn open_file_version<P: AsRef<Utf8Path>>(
624        &self,
625        path: P,
626        branch_id: &PublicKey,
627    ) -> Result<File> {
628        let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::EntryIsDirectory)?;
629
630        self.cd(parent)
631            .await?
632            .lookup_version(name, branch_id)?
633            .open()
634            .await
635    }
636
637    /// Opens a directory at the given path (relative to the repository root)
638    pub async fn open_directory<P: AsRef<Utf8Path>>(&self, path: P) -> Result<JointDirectory> {
639        self.cd(path).await
640    }
641
642    /// Creates a new file at the given path.
643    pub async fn create_file<P: AsRef<Utf8Path>>(&self, path: P) -> Result<File> {
644        let file = self
645            .local_branch()?
646            .ensure_file_exists(path.as_ref())
647            .await?;
648
649        Ok(file)
650    }
651
652    /// Creates a new directory at the given path.
653    pub async fn create_directory<P: AsRef<Utf8Path>>(&self, path: P) -> Result<Directory> {
654        let dir = self
655            .local_branch()?
656            .ensure_directory_exists(path.as_ref())
657            .await?;
658
659        Ok(dir)
660    }
661
662    /// Removes the file or directory (must be empty) and flushes its parent directory.
663    pub async fn remove_entry<P: AsRef<Utf8Path>>(&self, path: P) -> Result<()> {
664        let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::OperationNotSupported)?;
665        let mut parent = self.cd(parent).await?;
666        parent.remove_entry(name).await?;
667
668        Ok(())
669    }
670
671    /// Removes the file or directory (including its content) and flushes its parent directory.
672    pub async fn remove_entry_recursively<P: AsRef<Utf8Path>>(&self, path: P) -> Result<()> {
673        let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::OperationNotSupported)?;
674        let mut parent = self.cd(parent).await?;
675        parent.remove_entry_recursively(name).await?;
676
677        Ok(())
678    }
679
680    /// Moves (renames) an entry from the source path to the destination path.
681    /// If both source and destination refer to the same entry, this is a no-op.
682    pub async fn move_entry<S: AsRef<Utf8Path>, D: AsRef<Utf8Path>>(
683        &self,
684        src: S,
685        dst: D,
686    ) -> Result<()> {
687        let (src_dir_path, src_name) = path::decompose(src.as_ref()).ok_or(Error::EntryNotFound)?;
688        let (dst_dir_path, dst_name) = path::decompose(dst.as_ref()).ok_or(Error::EntryNotFound)?;
689
690        let local_branch = self.local_branch()?;
691        let src_joint_dir = self.cd(src_dir_path).await?;
692
693        // If the src is in a remote branch, need to merge it into the local one first:
694        let (mut src_dir, src_name, src_type) = match src_joint_dir.lookup_unique(src_name)? {
695            JointEntryRef::File(entry) => {
696                let src_name = entry.name().to_string();
697
698                let mut file = entry.open().await?;
699                file.fork(local_branch.clone()).await?;
700
701                (file.parent().await?, Cow::Owned(src_name), EntryType::File)
702            }
703            JointEntryRef::Directory(entry) => {
704                let mut dir_to_move = entry
705                    .open_with(MissingVersionStrategy::Skip, DirectoryFallback::Disabled)
706                    .await?;
707
708                let src_dir = dir_to_move
709                    .merge()
710                    .await?
711                    .1
712                    .parent()
713                    .await?
714                    .ok_or(Error::OperationNotSupported /* can't move root */)?;
715
716                (src_dir, Cow::Borrowed(src_name), EntryType::Directory)
717            }
718        };
719
720        let src_entry = src_dir.lookup(&src_name)?.clone_data();
721
722        let mut dst_joint_dir = self.cd(&dst_dir_path).await?;
723        let dst_dir = dst_joint_dir
724            .local_version_mut()
725            .ok_or(Error::PermissionDenied)?;
726
727        let dst_old_entry = dst_dir.lookup(dst_name);
728
729        // Emulating the behaviour of the libc's `rename` function
730        // (https://www.man7.org/linux/man-pages/man2/rename.2.html)
731        let dst_old_vv = match (src_type, dst_old_entry) {
732            (EntryType::File | EntryType::Directory, Ok(EntryRef::Tombstone(old_entry))) => {
733                old_entry.version_vector().clone()
734            }
735            (EntryType::File | EntryType::Directory, Err(Error::EntryNotFound)) => {
736                VersionVector::new()
737            }
738            (EntryType::File | EntryType::Directory, Err(error)) => return Err(error),
739            (EntryType::File, Ok(EntryRef::File(old_entry))) => old_entry.version_vector().clone(),
740            (EntryType::Directory, Ok(EntryRef::Directory(old_entry))) => {
741                if old_entry
742                    .open(DirectoryFallback::Disabled)
743                    .await?
744                    .entries()
745                    .all(|entry| entry.is_tombstone())
746                {
747                    old_entry.version_vector().clone()
748                } else {
749                    return Err(Error::DirectoryNotEmpty);
750                }
751            }
752            (EntryType::File, Ok(EntryRef::Directory(_))) => return Err(Error::EntryIsDirectory),
753            (EntryType::Directory, Ok(EntryRef::File(_))) => return Err(Error::EntryIsFile),
754        };
755
756        let dst_vv = dst_old_vv
757            .merged(src_entry.version_vector())
758            .incremented(*local_branch.id());
759
760        src_dir
761            .move_entry(&src_name, src_entry, dst_dir, dst_name, dst_vv)
762            .await?;
763
764        Ok(())
765    }
766
767    /// Returns the local branch or `Error::PermissionDenied` if this repo doesn't have at least
768    /// read access.
769    pub fn local_branch(&self) -> Result<Branch> {
770        self.shared.local_branch()
771    }
772
773    /// Returns the branch corresponding to the given id or `Error::PermissionDenied. if this repo
774    /// doesn't have at least read access.
775    #[cfg(test)]
776    pub fn get_branch(&self, id: PublicKey) -> Result<Branch> {
777        self.shared.get_branch(id)
778    }
779
780    /// Returns version vector of the given branch. Works in all access moded.
781    pub async fn get_branch_version_vector(&self, writer_id: &PublicKey) -> Result<VersionVector> {
782        Ok(self
783            .shared
784            .vault
785            .store()
786            .acquire_read()
787            .await?
788            .load_latest_approved_root_node(writer_id, RootNodeFilter::Any)
789            .await?
790            .proof
791            .into_version_vector())
792    }
793
794    /// Returns the version vector calculated by merging the version vectors of all branches.
795    pub async fn get_merged_version_vector(&self) -> Result<VersionVector> {
796        Ok(self
797            .shared
798            .vault
799            .store()
800            .acquire_read()
801            .await?
802            .load_latest_approved_root_nodes()
803            .try_fold(VersionVector::default(), |mut merged, node| {
804                merged.merge(&node.proof.version_vector);
805                future::ready(Ok(merged))
806            })
807            .await?)
808    }
809
810    /// Subscribe to event notifications.
811    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
812        self.shared.vault.event_tx.subscribe()
813    }
814
815    /// Gets the syncing progress of this repository (number of downloaded blocks / number of
816    /// all blocks)
817    pub async fn sync_progress(&self) -> Result<Progress> {
818        Ok(self.shared.vault.store().sync_progress().await?)
819    }
820
821    /// Check integrity of the stored data.
822    // TODO: Return more detailed info about any integrity violation.
823    pub async fn check_integrity(&self) -> Result<bool> {
824        Ok(self.shared.vault.store().check_integrity().await?)
825    }
826
827    // Opens the root directory across all branches as JointDirectory.
828    async fn root(&self) -> Result<JointDirectory> {
829        let local_branch = self.local_branch()?;
830        let branches = self.shared.load_branches().await?;
831
832        // If we are writer and the local branch doesn't exist yet in the db we include it anyway.
833        // This fixes a race condition when the local branch doesn't exist yet at the moment we
834        // load the branches but is subsequently created by merging a remote branch and the remote
835        // branch is then pruned.
836        let branches = if local_branch.keys().write().is_some()
837            && branches
838                .iter()
839                .all(|branch| branch.id() != local_branch.id())
840        {
841            let mut branches = branches;
842            branches.push(local_branch.clone());
843            branches
844        } else {
845            branches
846        };
847
848        let mut dirs = Vec::new();
849
850        for branch in branches {
851            let dir = match branch
852                .open_root(DirectoryLocking::Enabled, DirectoryFallback::Enabled)
853                .await
854            {
855                Ok(dir) => dir,
856                Err(error @ Error::Store(store::Error::BranchNotFound)) => {
857                    tracing::trace!(
858                        branch_id = ?branch.id(),
859                        ?error,
860                        "Failed to open root directory"
861                    );
862                    // Either this is the local branch which doesn't exist yet in the store or a
863                    // remote branch which has been pruned in the meantime. This is safe to ignore.
864                    continue;
865                }
866                Err(error @ Error::Store(store::Error::BlockNotFound)) => {
867                    tracing::trace!(
868                        branch_id = ?branch.id(),
869                        ?error,
870                        "Failed to open root directory"
871                    );
872                    // Some branch root blocks may not have been loaded across the network yet.
873                    // This is safe to ignore.
874                    continue;
875                }
876                Err(error) => {
877                    tracing::error!(
878                        branch_id = ?branch.id(),
879                        ?error,
880                        "Failed to open root directory"
881                    );
882                    return Err(error);
883                }
884            };
885
886            dirs.push(dir);
887        }
888
889        Ok(JointDirectory::new(Some(local_branch), dirs))
890    }
891
892    pub async fn cd<P: AsRef<Utf8Path>>(&self, path: P) -> Result<JointDirectory> {
893        self.root().await?.cd(path).await
894    }
895
896    /// Close all db connections held by this repository. After this function returns, any
897    /// subsequent operation on this repository that requires to access the db returns an error.
898    pub async fn close(&self) -> Result<()> {
899        // Abort and *await* the tasks to make sure that the state they are holding is definitely
900        // dropped before we return from this function.
901        for task in [&self.worker_handle, &self.progress_reporter_handle] {
902            let task = task.lock().unwrap().take();
903            if let Some(task) = task {
904                task.abort();
905                task.await.ok();
906            }
907        }
908
909        self.shared.vault.store().close().await?;
910
911        Ok(())
912    }
913
914    pub async fn debug_print_root(&self) {
915        self.debug_print(DebugPrinter::new()).await
916    }
917
918    pub async fn debug_print(&self, print: DebugPrinter) {
919        print.display(&"Repository");
920
921        let branches = match self.shared.load_branches().await {
922            Ok(branches) => branches,
923            Err(error) => {
924                print.display(&format_args!("failed to load branches: {error:?}"));
925                return;
926            }
927        };
928
929        let writer_id = self.shared.credentials.read().unwrap().writer_id;
930
931        for branch in branches {
932            let print = print.indent();
933            let local = if branch.id() == &writer_id {
934                " (local)"
935            } else {
936                ""
937            };
938            print.display(&format_args!(
939                "Branch ID: {:?}{}, root block ID:{:?}",
940                branch.id(),
941                local,
942                branch.root_block_id().await
943            ));
944            let print = print.indent();
945            print.display(&format_args!(
946                "/, vv: {:?}",
947                branch.version_vector().await.unwrap_or_default()
948            ));
949            branch.debug_print(print.indent()).await;
950        }
951
952        print.display(&"Index");
953        let print = print.indent();
954        self.shared.vault.debug_print(print).await;
955    }
956
957    /// Returns the total number of blocks in this repository. This is useful for diagnostics and
958    /// tests.
959    pub async fn count_blocks(&self) -> Result<u64> {
960        Ok(self.shared.vault.store().count_blocks().await?)
961    }
962
963    fn db(&self) -> &db::Pool {
964        self.shared.vault.store().db()
965    }
966
967    fn update_credentials(&self, credentials: Credentials) {
968        tracing::debug!(
969            parent: self.shared.vault.monitor.span(),
970            access = ?credentials.secrets.access_mode(),
971            writer_id = ?credentials.writer_id,
972            "Repository access mode changed"
973        );
974
975        self.shared
976            .vault
977            .block_tracker
978            .set_request_mode(block_request_mode(&credentials.secrets));
979
980        *self.shared.credentials.write().unwrap() = credentials;
981        *self.worker_handle.lock().unwrap() = Some(spawn_worker(self.shared.clone()));
982    }
983}
984
985pub struct RepositoryHandle {
986    pub(crate) vault: Vault,
987}
988
989struct Shared {
990    vault: Vault,
991    credentials: BlockingRwLock<Credentials>,
992    branch_shared: BranchShared,
993}
994
995impl Shared {
996    fn new(pool: db::Pool, credentials: Credentials, monitor: RepositoryMonitor) -> Self {
997        let event_tx = EventSender::new(EVENT_CHANNEL_CAPACITY);
998        let vault = Vault::new(*credentials.secrets.id(), event_tx, pool, monitor);
999
1000        vault
1001            .block_tracker
1002            .set_request_mode(block_request_mode(&credentials.secrets));
1003
1004        Self {
1005            vault,
1006            credentials: BlockingRwLock::new(credentials),
1007            branch_shared: BranchShared::new(),
1008        }
1009    }
1010
1011    pub fn local_branch(&self) -> Result<Branch> {
1012        let credentials = self.credentials.read().unwrap();
1013
1014        Ok(self.make_branch(
1015            credentials.writer_id,
1016            credentials.secrets.keys().ok_or(Error::PermissionDenied)?,
1017        ))
1018    }
1019
1020    pub fn get_branch(&self, id: PublicKey) -> Result<Branch> {
1021        let credentials = self.credentials.read().unwrap();
1022        let keys = credentials.secrets.keys().ok_or(Error::PermissionDenied)?;
1023
1024        // Only the local branch is writable.
1025        let keys = if id == credentials.writer_id {
1026            keys
1027        } else {
1028            keys.read_only()
1029        };
1030
1031        Ok(self.make_branch(id, keys))
1032    }
1033
1034    fn make_branch(&self, id: PublicKey, keys: AccessKeys) -> Branch {
1035        Branch::new(
1036            id,
1037            self.vault.store().clone(),
1038            keys,
1039            self.branch_shared.clone(),
1040            self.vault.event_tx.clone(),
1041        )
1042    }
1043
1044    pub async fn load_branches(&self) -> Result<Vec<Branch>> {
1045        self.vault
1046            .store()
1047            .acquire_read()
1048            .await?
1049            .load_latest_approved_root_nodes()
1050            .err_into()
1051            .and_then(|root_node| future::ready(self.get_branch(root_node.proof.writer_id)))
1052            .try_collect()
1053            .await
1054    }
1055}
1056
1057fn spawn_worker(shared: Arc<Shared>) -> ScopedJoinHandle<()> {
1058    let span = shared.vault.monitor.span().clone();
1059    scoped_task::spawn(worker::run(shared).instrument(span))
1060}
1061
1062async fn report_sync_progress(vault: Vault) {
1063    let mut prev_progress = Progress { value: 0, total: 0 };
1064
1065    let events = stream::unfold(vault.event_tx.subscribe(), |mut rx| async move {
1066        match rx.recv().await {
1067            Ok(_) | Err(RecvError::Lagged(_)) => Some(((), rx)),
1068            Err(RecvError::Closed) => None,
1069        }
1070    });
1071    let events = Throttle::new(events, Duration::from_secs(1));
1072    let mut events = pin!(events);
1073
1074    while events.next().await.is_some() {
1075        let next_progress = match vault.store().sync_progress().await {
1076            Ok(progress) => progress,
1077            Err(error) => {
1078                tracing::error!("Failed to retrieve sync progress: {:?}", error);
1079                continue;
1080            }
1081        };
1082
1083        if next_progress != prev_progress {
1084            prev_progress = next_progress;
1085            tracing::debug!(
1086                "Sync progress: {} bytes ({:.1})",
1087                prev_progress * BLOCK_SIZE as u64,
1088                prev_progress.percent()
1089            );
1090        }
1091    }
1092}
1093
1094fn block_request_mode(secrets: &AccessSecrets) -> BlockRequestMode {
1095    if secrets.can_read() {
1096        BlockRequestMode::Lazy
1097    } else {
1098        BlockRequestMode::Greedy
1099    }
1100}