ouisync/repository/
mod.rs

1pub(crate) mod monitor;
2
3mod credentials;
4mod metadata;
5mod params;
6mod vault;
7mod worker;
8
9#[cfg(test)]
10mod tests;
11
12pub use self::{credentials::Credentials, metadata::Metadata, params::RepositoryParams};
13
14pub(crate) use self::{
15    metadata::{data_version, quota},
16    vault::Vault,
17};
18
19use self::monitor::RepositoryMonitor;
20use crate::{
21    access_control::{Access, AccessChange, AccessKeys, AccessMode, AccessSecrets, LocalSecret},
22    block_tracker::BlockRequestMode,
23    branch::{Branch, BranchShared},
24    crypto::{sign::PublicKey, PasswordSalt},
25    db::{self, DatabaseId},
26    debug::DebugPrinter,
27    directory::{Directory, DirectoryFallback, DirectoryLocking, EntryRef, EntryType},
28    error::{Error, Result},
29    event::{Event, EventSender},
30    file::File,
31    joint_directory::{JointDirectory, JointEntryRef, MissingVersionStrategy},
32    path,
33    progress::Progress,
34    protocol::{RootNodeFilter, StorageSize, BLOCK_SIZE},
35    store,
36    sync::stream::Throttle,
37    version_vector::VersionVector,
38};
39use camino::Utf8Path;
40use deadlock::{BlockingMutex, BlockingRwLock};
41use futures_util::{future, TryStreamExt};
42use futures_util::{stream, StreamExt};
43use metrics::{NoopRecorder, Recorder};
44use scoped_task::ScopedJoinHandle;
45use state_monitor::StateMonitor;
46use std::{
47    borrow::Cow,
48    path::{Path, PathBuf},
49    pin::pin,
50    sync::Arc,
51    time::SystemTime,
52};
53use tokio::{
54    sync::broadcast::{self, error::RecvError},
55    time::Duration,
56};
57use tracing::instrument::Instrument;
58
59const EVENT_CHANNEL_CAPACITY: usize = 10000;
60
61pub struct Repository {
62    shared: Arc<Shared>,
63    worker_handle: BlockingMutex<Option<ScopedJoinHandle<()>>>,
64    progress_reporter_handle: BlockingMutex<Option<ScopedJoinHandle<()>>>,
65}
66
67/// List of repository database files. Includes the main db file and any auxiliary files.
68///
69/// The aux files don't always exists but when they do and one wants to rename, move or delete the
70/// repository, they should rename/move/delete these files as well.
71///
72/// Note ideally the aux files should be deleted automatically when the repo has been closed. But
73/// because of a [bug][1] in sqlx, they are sometimes not and need to be handled
74/// (move,deleted) manually. When the bug is fixed, this function can be removed.
75///
76/// [1]: https://github.com/launchbadge/sqlx/issues/3217
77pub fn database_files(store_path: impl AsRef<Path>) -> Vec<PathBuf> {
78    // Sqlite database consists of up to three files: main db (always present), WAL and WAL-index.
79    ["", "-wal", "-shm"]
80        .into_iter()
81        .map(|suffix| {
82            let mut path = store_path.as_ref().as_os_str().to_owned();
83            path.push(suffix);
84            path.into()
85        })
86        .collect()
87}
88
89impl Repository {
90    /// Creates a new repository.
91    pub async fn create(params: &RepositoryParams<impl Recorder>, access: Access) -> Result<Self> {
92        let pool = params.create().await?;
93        let device_id = params.device_id();
94        let monitor = params.monitor();
95
96        let mut tx = pool.begin_write().await?;
97
98        let local_keys = metadata::initialize_access_secrets(&mut tx, &access).await?;
99        let writer_id =
100            metadata::get_or_generate_writer_id(&mut tx, local_keys.write.as_deref()).await?;
101        metadata::set_device_id(&mut tx, &device_id).await?;
102
103        tx.commit().await?;
104
105        let credentials = Credentials {
106            secrets: access.secrets(),
107            writer_id,
108        };
109
110        Self::new(pool, credentials, monitor).init().await
111    }
112
113    /// Opens an existing repository.
114    pub async fn open(
115        params: &RepositoryParams<impl Recorder>,
116        local_secret: Option<LocalSecret>,
117        access_mode: AccessMode,
118    ) -> Result<Self> {
119        let pool = params.open().await?;
120        let monitor = params.monitor();
121        let device_id = params.device_id();
122
123        let mut tx = pool.begin_write().await?;
124
125        let (secrets, local_key) =
126            metadata::get_access_secrets(&mut tx, local_secret.as_ref()).await?;
127
128        let secrets = secrets.with_mode(access_mode);
129
130        let writer_id = if metadata::check_device_id(&mut tx, &device_id).await? {
131            if secrets.can_write() {
132                metadata::get_or_generate_writer_id(&mut tx, local_key.as_deref()).await?
133            } else {
134                metadata::generate_writer_id()
135            }
136        } else {
137            // Device id changed, likely because the repo database has been transferred to a
138            // different device. We need to generate a new writer id.
139            //
140            // Note we need to do this even when not currently opening the repo in write mode. This
141            // is so that when the access mode is subsequently switched to write
142            // (with [set_access_mode]) we don't end up using the wrong writer_id.
143            let writer_id = metadata::generate_writer_id();
144
145            metadata::set_device_id(&mut tx, &device_id).await?;
146            metadata::set_writer_id(&mut tx, &writer_id, local_key.as_deref()).await?;
147
148            writer_id
149        };
150
151        tx.commit().await?;
152
153        let credentials = Credentials { secrets, writer_id };
154
155        Self::new(pool, credentials, monitor).init().await
156    }
157
158    fn new(pool: db::Pool, credentials: Credentials, monitor: RepositoryMonitor) -> Self {
159        Self {
160            shared: Arc::new(Shared::new(pool, credentials, monitor)),
161            worker_handle: BlockingMutex::new(None),
162            progress_reporter_handle: BlockingMutex::new(None),
163        }
164    }
165
166    async fn init(self) -> Result<Self> {
167        let credentials = self.credentials();
168
169        if let Some(keys) = credentials
170            .secrets
171            .write_secrets()
172            .map(|secrets| &secrets.write_keys)
173        {
174            self.shared
175                .vault
176                .store()
177                .migrate_data(credentials.writer_id, keys)
178                .await?;
179        }
180
181        {
182            let mut conn = self.shared.vault.store().db().acquire().await?;
183            if let Some(block_expiration) = metadata::block_expiration::get(&mut conn).await? {
184                self.shared
185                    .vault
186                    .set_block_expiration(Some(block_expiration))?;
187            }
188        }
189
190        tracing::debug!(
191            parent: self.shared.vault.monitor.span(),
192            access = ?credentials.secrets.access_mode(),
193            writer_id = ?credentials.writer_id,
194            "Repository opened"
195        );
196
197        *self.worker_handle.lock().unwrap() = Some(spawn_worker(self.shared.clone()));
198
199        *self.progress_reporter_handle.lock().unwrap() = Some(scoped_task::spawn(
200            report_sync_progress(self.shared.vault.clone())
201                .instrument(self.shared.vault.monitor.span().clone()),
202        ));
203
204        Ok(self)
205    }
206
207    pub async fn database_id(&self) -> Result<DatabaseId> {
208        Ok(metadata::get_or_generate_database_id(self.db()).await?)
209    }
210
211    pub async fn requires_local_secret_for_reading(&self) -> Result<bool> {
212        let mut conn = self.db().acquire().await?;
213        Ok(metadata::requires_local_secret_for_reading(&mut conn).await?)
214    }
215
216    pub async fn requires_local_secret_for_writing(&self) -> Result<bool> {
217        let mut conn = self.db().acquire().await?;
218        Ok(metadata::requires_local_secret_for_writing(&mut conn).await?)
219    }
220
221    /// Sets, unsets or changes local secrets for accessing the repository or disables the given
222    /// access mode.
223    ///
224    /// In order to enable or change a given access mode the repository must currently be in at
225    /// least that access mode. Disabling an access mode doesn't have this restriction.
226    ///
227    /// Disabling a given access mode makes the repo impossible to be opened in that mode anymore.
228    /// However, when the repo is currently in that mode it still remains in it until the repo is
229    /// closed.
230    ///
231    /// To restore a disabled mode the repo must first be put into that mode using
232    /// [Self::set_credentials()] where the `Credentials` must be obtained from `AccessSecrets` with
233    /// at least the mode one wants to restore.
234    ///
235    /// If `read` or `write` is `None` then no change is made to that mode. If both are `None` then
236    /// this function is a no-op.
237    ///
238    /// Disabling the read mode while keeping write mode enabled is allowed but not very useful as
239    /// write mode also grants read access.
240    pub async fn set_access(
241        &self,
242        read_change: Option<AccessChange>,
243        write_change: Option<AccessChange>,
244    ) -> Result<()> {
245        let mut tx = self.db().begin_write().await?;
246
247        if let Some(change) = read_change {
248            self.set_read_access(&mut tx, change).await?;
249        }
250
251        if let Some(change) = write_change {
252            self.set_write_access(&mut tx, change).await?;
253        }
254
255        tx.commit().await?;
256
257        Ok(())
258    }
259
260    async fn set_read_access(
261        &self,
262        tx: &mut db::WriteTransaction,
263        change: AccessChange,
264    ) -> Result<()> {
265        let local = match &change {
266            AccessChange::Enable(Some(local_secret)) => {
267                Some(metadata::secret_to_key_and_salt(local_secret))
268            }
269            AccessChange::Enable(None) => None,
270            AccessChange::Disable => {
271                metadata::remove_read_key(tx).await?;
272                return Ok(());
273            }
274        };
275
276        let (id, read_key) = {
277            let cred = self.shared.credentials.read().unwrap();
278            (
279                *cred.secrets.id(),
280                cred.secrets
281                    .read_key()
282                    .ok_or(Error::PermissionDenied)?
283                    .clone(),
284            )
285        };
286
287        metadata::set_read_key(
288            tx,
289            &id,
290            &read_key,
291            local.as_ref().map(|(k, s)| (k.as_ref(), s.as_ref())),
292        )
293        .await?;
294
295        Ok(())
296    }
297
298    async fn set_write_access(
299        &self,
300        tx: &mut db::WriteTransaction,
301        change: AccessChange,
302    ) -> Result<()> {
303        let local = match &change {
304            AccessChange::Enable(Some(local_secret)) => {
305                Some(metadata::secret_to_key_and_salt(local_secret))
306            }
307            AccessChange::Enable(None) => None,
308            AccessChange::Disable => {
309                metadata::remove_write_key(tx).await?;
310                return Ok(());
311            }
312        };
313
314        let (write_secrets, writer_id) = {
315            let cred = self.shared.credentials.read().unwrap();
316            (
317                cred.secrets
318                    .write_secrets()
319                    .ok_or(Error::PermissionDenied)?
320                    .clone(),
321                cred.writer_id,
322            )
323        };
324
325        metadata::set_write_key(
326            tx,
327            &write_secrets,
328            local.as_ref().map(|(k, s)| (k.as_ref(), s.as_ref())),
329        )
330        .await?;
331        metadata::set_writer_id(tx, &writer_id, local.as_ref().map(|(k, _)| k.as_ref())).await?;
332
333        Ok(())
334    }
335
336    /// Gets the current credentials of this repository.
337    ///
338    /// See also [Self::set_credentials()].
339    pub fn credentials(&self) -> Credentials {
340        self.shared.credentials.read().unwrap().clone()
341    }
342
343    pub fn secrets(&self) -> AccessSecrets {
344        self.shared.credentials.read().unwrap().secrets.clone()
345    }
346
347    /// Gets the current access mode of this repository.
348    pub fn access_mode(&self) -> AccessMode {
349        self.shared
350            .credentials
351            .read()
352            .unwrap()
353            .secrets
354            .access_mode()
355    }
356
357    /// Switches the repository to the given mode.
358    ///
359    /// The actual mode the repository gets switched to is the higher of the current access mode
360    /// and the mode provided by `local_key` but at most the mode specified in `access_mode`
361    /// ("higher" means according to `AccessMode`'s `Ord` impl, that is: Write > Read > Blind).
362    pub async fn set_access_mode(
363        &self,
364        access_mode: AccessMode,
365        local_secret: Option<LocalSecret>,
366    ) -> Result<()> {
367        // Try to use the current secrets but fall back to the secrets stored in the metadata if the
368        // current ones are insufficient and the stored ones have higher access mode.
369        let old_secrets = {
370            let creds = self.shared.credentials.read().unwrap();
371
372            if creds.secrets.access_mode() == access_mode {
373                return Ok(());
374            }
375
376            creds.secrets.clone()
377        };
378
379        let mut tx = self.db().begin_write().await?;
380
381        let (secrets, local_key) = if old_secrets.access_mode() >= access_mode {
382            (old_secrets, None)
383        } else {
384            let (new_secrets, local_key) =
385                metadata::get_access_secrets(&mut tx, local_secret.as_ref()).await?;
386
387            if new_secrets.access_mode() > old_secrets.access_mode() {
388                (new_secrets, local_key)
389            } else {
390                (old_secrets, None)
391            }
392        };
393
394        let secrets = secrets.with_mode(access_mode);
395
396        let writer_id = if secrets.can_write() {
397            metadata::get_or_generate_writer_id(&mut tx, local_key.as_deref()).await?
398        } else {
399            metadata::generate_writer_id()
400        };
401
402        tx.commit().await?;
403
404        // Data migration can only be applied in write mode so apply any pending ones if we just
405        // switched to write mode.
406        if let Some(write_keys) = secrets.write_secrets().map(|secrets| &secrets.write_keys) {
407            self.shared
408                .vault
409                .store()
410                .migrate_data(writer_id, write_keys)
411                .await?;
412        }
413
414        self.update_credentials(Credentials { secrets, writer_id });
415
416        Ok(())
417    }
418
419    /// Overrides the current credentials of this repository.
420    ///
421    /// This is useful for moving/renaming the repo database or to restore access which has been
422    /// either disabled or it's local secret lost.
423    ///
424    /// # Move/rename the repo db
425    ///
426    /// 1. Obtain the current credentials with [Self::credentials()] and keep them locally.
427    /// 2. Close the repo.
428    /// 3. Rename the repo database files(s).
429    /// 4. Open the repo from its new location in blind mode.
430    /// 5. Restore the credentials from step 1 with [Self::set_credentials()].
431    ///
432    /// # Restore access
433    ///
434    /// 1. Get the `AccessSecrets` the repository was originally created from (e.g., by extracting
435    ///    them from the original `ShareToken`).
436    /// 2. Construct `Credentials` using this access secrets and a random writer id.
437    /// 3. Restore the credentials with [Self::set_credentials()].
438    /// 4. Enable/change the access with [Self::set_access()].
439    pub async fn set_credentials(&self, credentials: Credentials) -> Result<()> {
440        // Check the credentials are actually for this repository
441        let expected_id = {
442            let mut conn = self.db().acquire().await?;
443            metadata::get_repository_id(&mut conn).await?
444        };
445
446        if credentials.secrets.id() != &expected_id {
447            return Err(Error::PermissionDenied);
448        }
449
450        if let Some(write_secrets) = credentials.secrets.write_secrets() {
451            self.shared
452                .vault
453                .store()
454                .migrate_data(credentials.writer_id, &write_secrets.write_keys)
455                .await?;
456        }
457
458        self.update_credentials(credentials);
459
460        Ok(())
461    }
462
463    pub async fn unlock_secrets(&self, local_secret: LocalSecret) -> Result<AccessSecrets> {
464        let mut tx = self.db().begin_write().await?;
465        Ok(metadata::get_access_secrets(&mut tx, Some(&local_secret))
466            .await?
467            .0)
468    }
469
470    /// Get accessor for repository metadata. The metadata are arbitrary key-value entries that are
471    /// stored inside the repository but not synced to other replicas.
472    pub fn metadata(&self) -> Metadata {
473        self.shared.vault.metadata()
474    }
475
476    /// Set the storage quota in bytes. Use `None` to disable quota. Default is `None`.
477    pub async fn set_quota(&self, quota: Option<StorageSize>) -> Result<()> {
478        self.shared.vault.set_quota(quota).await
479    }
480
481    /// Get the storage quota in bytes or `None` if no quota is set.
482    pub async fn quota(&self) -> Result<Option<StorageSize>> {
483        self.shared.vault.quota().await
484    }
485
486    /// Set the duration after which blocks start to expire (are deleted) when not used. Use `None`
487    /// to disable expiration. Default is `None`.
488    pub async fn set_block_expiration(&self, block_expiration: Option<Duration>) -> Result<()> {
489        let mut tx = self.db().begin_write().await?;
490        metadata::block_expiration::set(&mut tx, block_expiration).await?;
491        tx.commit().await?;
492
493        self.shared.vault.set_block_expiration(block_expiration)
494    }
495
496    /// Get the block expiration duration. `None` means block expiration is not set.
497    pub fn block_expiration(&self) -> Option<Duration> {
498        self.shared.vault.block_expiration()
499    }
500
501    /// Get the time when the last block expired or `None` if there are still some unexpired blocks.
502    /// If block expiration is not enabled, always return `None`.
503    pub fn last_block_expiration_time(&self) -> Option<SystemTime> {
504        self.shared.vault.last_block_expiration_time()
505    }
506
507    /// Get the total size of the data stored in this repository.
508    pub async fn size(&self) -> Result<StorageSize> {
509        self.shared.vault.size().await
510    }
511
512    pub fn handle(&self) -> RepositoryHandle {
513        RepositoryHandle {
514            vault: self.shared.vault.clone(),
515        }
516    }
517
518    pub async fn get_read_password_salt(&self) -> Result<PasswordSalt> {
519        let mut tx = self.db().begin_write().await?;
520        Ok(metadata::get_password_salt(&mut tx, metadata::KeyType::Read).await?)
521    }
522
523    pub async fn get_write_password_salt(&self) -> Result<PasswordSalt> {
524        let mut tx = self.db().begin_write().await?;
525        Ok(metadata::get_password_salt(&mut tx, metadata::KeyType::Write).await?)
526    }
527
528    /// Get the state monitor node of this repository.
529    pub fn monitor(&self) -> &StateMonitor {
530        self.shared.vault.monitor.node()
531    }
532
533    /// Export the repository to the given file.
534    ///
535    /// The repository is currently exported as read-only with no password. In the future other
536    /// modes might be added.
537    pub async fn export(&self, dst: &Path) -> Result<()> {
538        /// RAII to delete the exported repo in case the process fails or is interupted to avoid
539        /// exporting the repo with a wrong access mode.
540        struct Cleanup<'a> {
541            path: &'a Path,
542            armed: bool,
543        }
544
545        impl Drop for Cleanup<'_> {
546            fn drop(&mut self) {
547                if !self.armed {
548                    return;
549                }
550
551                if let Err(error) = std::fs::remove_file(self.path) {
552                    tracing::error!(
553                        path = ?self.path,
554                        ?error,
555                        "failed to delete partially exported repository",
556                    );
557                }
558            }
559        }
560
561        let mut cleanup = Cleanup {
562            path: dst,
563            armed: true,
564        };
565
566        // Export the repo to `dst`
567        self.shared.vault.store().export(dst).await?;
568
569        // Open it and strip write access and read password (if any).
570        let pool = db::open(dst).await?;
571        let credentials = self.credentials().with_mode(AccessMode::Read);
572        let access_mode = credentials.secrets.access_mode();
573        let monitor = RepositoryMonitor::new(StateMonitor::make_root(), &NoopRecorder);
574        let repo = Self::new(pool, credentials, monitor);
575
576        match access_mode {
577            AccessMode::Blind => {
578                repo.set_access(Some(AccessChange::Disable), Some(AccessChange::Disable))
579                    .await?
580            }
581            AccessMode::Read => {
582                repo.set_access(
583                    Some(AccessChange::Enable(None)),
584                    Some(AccessChange::Disable),
585                )
586                .await?
587            }
588            AccessMode::Write => unreachable!(),
589        }
590
591        repo.close().await?;
592
593        cleanup.armed = false;
594
595        Ok(())
596    }
597
598    /// Looks up an entry by its path. The path must be relative to the repository root.
599    /// If the entry exists, returns its `EntryType`, otherwise returns `EntryNotFound`.
600    pub async fn lookup_type<P: AsRef<Utf8Path>>(&self, path: P) -> Result<EntryType> {
601        match path::decompose(path.as_ref()) {
602            Some((parent, name)) => {
603                let parent = self.open_directory(parent).await?;
604                Ok(parent.lookup_unique(name)?.entry_type())
605            }
606            None => Ok(EntryType::Directory),
607        }
608    }
609
610    /// Opens a file at the given path (relative to the repository root)
611    pub async fn open_file<P: AsRef<Utf8Path>>(&self, path: P) -> Result<File> {
612        let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::EntryIsDirectory)?;
613
614        self.cd(parent)
615            .await?
616            .lookup_unique(name)?
617            .file()?
618            .open()
619            .await
620    }
621
622    /// Open a specific version of the file at the given path.
623    pub async fn open_file_version<P: AsRef<Utf8Path>>(
624        &self,
625        path: P,
626        branch_id: &PublicKey,
627    ) -> Result<File> {
628        let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::EntryIsDirectory)?;
629
630        self.cd(parent)
631            .await?
632            .lookup_version(name, branch_id)?
633            .open()
634            .await
635    }
636
637    /// Opens a directory at the given path (relative to the repository root)
638    pub async fn open_directory<P: AsRef<Utf8Path>>(&self, path: P) -> Result<JointDirectory> {
639        self.cd(path).await
640    }
641
642    /// Creates a new file at the given path.
643    pub async fn create_file<P: AsRef<Utf8Path>>(&self, path: P) -> Result<File> {
644        let file = self
645            .local_branch()?
646            .ensure_file_exists(path.as_ref())
647            .await?;
648
649        Ok(file)
650    }
651
652    /// Creates a new directory at the given path.
653    pub async fn create_directory<P: AsRef<Utf8Path>>(&self, path: P) -> Result<Directory> {
654        let dir = self
655            .local_branch()?
656            .ensure_directory_exists(path.as_ref())
657            .await?;
658
659        Ok(dir)
660    }
661
662    /// Removes the file or directory (must be empty) and flushes its parent directory.
663    pub async fn remove_entry<P: AsRef<Utf8Path>>(&self, path: P) -> Result<()> {
664        let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::OperationNotSupported)?;
665        let mut parent = self.cd(parent).await?;
666        parent.remove_entry(name).await?;
667
668        Ok(())
669    }
670
671    /// Removes the file or directory (including its content) and flushes its parent directory.
672    pub async fn remove_entry_recursively<P: AsRef<Utf8Path>>(&self, path: P) -> Result<()> {
673        let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::OperationNotSupported)?;
674        let mut parent = self.cd(parent).await?;
675        parent.remove_entry_recursively(name).await?;
676
677        Ok(())
678    }
679
680    /// Moves (renames) an entry from the source path to the destination path.
681    /// If both source and destination refer to the same entry, this is a no-op.
682    pub async fn move_entry<S: AsRef<Utf8Path>, D: AsRef<Utf8Path>>(
683        &self,
684        src: S,
685        dst: D,
686    ) -> Result<()> {
687        let (src_dir_path, src_name) = path::decompose(src.as_ref()).ok_or(Error::EntryNotFound)?;
688        let (dst_dir_path, dst_name) = path::decompose(dst.as_ref()).ok_or(Error::EntryNotFound)?;
689
690        let local_branch = self.local_branch()?;
691        let src_joint_dir = self.cd(src_dir_path).await?;
692
693        // If the src is in a remote branch, need to merge it into the local one first:
694        let (mut src_dir, src_name, src_type) = match src_joint_dir.lookup_unique(src_name)? {
695            JointEntryRef::File(entry) => {
696                let src_name = entry.name().to_string();
697
698                let mut file = entry.open().await?;
699                file.fork(local_branch.clone()).await?;
700
701                (file.parent().await?, Cow::Owned(src_name), EntryType::File)
702            }
703            JointEntryRef::Directory(entry) => {
704                let mut dir_to_move = entry
705                    .open_with(MissingVersionStrategy::Skip, DirectoryFallback::Disabled)
706                    .await?;
707                let dir_to_move = dir_to_move.merge().await?;
708
709                let src_dir = dir_to_move
710                    .parent()
711                    .await?
712                    .ok_or(Error::OperationNotSupported /* can't move root */)?;
713
714                (src_dir, Cow::Borrowed(src_name), EntryType::Directory)
715            }
716        };
717
718        let src_entry = src_dir.lookup(&src_name)?.clone_data();
719
720        let mut dst_joint_dir = self.cd(&dst_dir_path).await?;
721        let dst_dir = dst_joint_dir
722            .local_version_mut()
723            .ok_or(Error::PermissionDenied)?;
724
725        let dst_old_entry = dst_dir.lookup(dst_name);
726
727        // Emulating the behaviour of the libc's `rename` function
728        // (https://www.man7.org/linux/man-pages/man2/rename.2.html)
729        let dst_old_vv = match (src_type, dst_old_entry) {
730            (EntryType::File | EntryType::Directory, Ok(EntryRef::Tombstone(old_entry))) => {
731                old_entry.version_vector().clone()
732            }
733            (EntryType::File | EntryType::Directory, Err(Error::EntryNotFound)) => {
734                VersionVector::new()
735            }
736            (EntryType::File | EntryType::Directory, Err(error)) => return Err(error),
737            (EntryType::File, Ok(EntryRef::File(old_entry))) => old_entry.version_vector().clone(),
738            (EntryType::Directory, Ok(EntryRef::Directory(old_entry))) => {
739                if old_entry
740                    .open(DirectoryFallback::Disabled)
741                    .await?
742                    .entries()
743                    .all(|entry| entry.is_tombstone())
744                {
745                    old_entry.version_vector().clone()
746                } else {
747                    return Err(Error::DirectoryNotEmpty);
748                }
749            }
750            (EntryType::File, Ok(EntryRef::Directory(_))) => return Err(Error::EntryIsDirectory),
751            (EntryType::Directory, Ok(EntryRef::File(_))) => return Err(Error::EntryIsFile),
752        };
753
754        let dst_vv = dst_old_vv
755            .merged(src_entry.version_vector())
756            .incremented(*local_branch.id());
757
758        src_dir
759            .move_entry(&src_name, src_entry, dst_dir, dst_name, dst_vv)
760            .await?;
761
762        Ok(())
763    }
764
765    /// Returns the local branch or `Error::PermissionDenied` if this repo doesn't have at least
766    /// read access.
767    pub fn local_branch(&self) -> Result<Branch> {
768        self.shared.local_branch()
769    }
770
771    /// Returns the branch corresponding to the given id or `Error::PermissionDenied. if this repo
772    /// doesn't have at least read access.
773    #[cfg(test)]
774    pub fn get_branch(&self, id: PublicKey) -> Result<Branch> {
775        self.shared.get_branch(id)
776    }
777
778    /// Returns version vector of the given branch. Works in all access moded.
779    pub async fn get_branch_version_vector(&self, writer_id: &PublicKey) -> Result<VersionVector> {
780        Ok(self
781            .shared
782            .vault
783            .store()
784            .acquire_read()
785            .await?
786            .load_latest_approved_root_node(writer_id, RootNodeFilter::Any)
787            .await?
788            .proof
789            .into_version_vector())
790    }
791
792    /// Returns the version vector calculated by merging the version vectors of all branches.
793    pub async fn get_merged_version_vector(&self) -> Result<VersionVector> {
794        Ok(self
795            .shared
796            .vault
797            .store()
798            .acquire_read()
799            .await?
800            .load_latest_approved_root_nodes()
801            .try_fold(VersionVector::default(), |mut merged, node| {
802                merged.merge(&node.proof.version_vector);
803                future::ready(Ok(merged))
804            })
805            .await?)
806    }
807
808    /// Subscribe to event notifications.
809    pub fn subscribe(&self) -> broadcast::Receiver<Event> {
810        self.shared.vault.event_tx.subscribe()
811    }
812
813    /// Gets the syncing progress of this repository (number of downloaded blocks / number of
814    /// all blocks)
815    pub async fn sync_progress(&self) -> Result<Progress> {
816        Ok(self.shared.vault.store().sync_progress().await?)
817    }
818
819    /// Check integrity of the stored data.
820    // TODO: Return more detailed info about any integrity violation.
821    pub async fn check_integrity(&self) -> Result<bool> {
822        Ok(self.shared.vault.store().check_integrity().await?)
823    }
824
825    // Opens the root directory across all branches as JointDirectory.
826    async fn root(&self) -> Result<JointDirectory> {
827        let local_branch = self.local_branch()?;
828        let branches = self.shared.load_branches().await?;
829
830        // If we are writer and the local branch doesn't exist yet in the db we include it anyway.
831        // This fixes a race condition when the local branch doesn't exist yet at the moment we
832        // load the branches but is subsequently created by merging a remote branch and the remote
833        // branch is then pruned.
834        let branches = if local_branch.keys().write().is_some()
835            && branches
836                .iter()
837                .all(|branch| branch.id() != local_branch.id())
838        {
839            let mut branches = branches;
840            branches.push(local_branch.clone());
841            branches
842        } else {
843            branches
844        };
845
846        let mut dirs = Vec::new();
847
848        for branch in branches {
849            let dir = match branch
850                .open_root(DirectoryLocking::Enabled, DirectoryFallback::Enabled)
851                .await
852            {
853                Ok(dir) => dir,
854                Err(error @ Error::Store(store::Error::BranchNotFound)) => {
855                    tracing::trace!(
856                        branch_id = ?branch.id(),
857                        ?error,
858                        "Failed to open root directory"
859                    );
860                    // Either this is the local branch which doesn't exist yet in the store or a
861                    // remote branch which has been pruned in the meantime. This is safe to ignore.
862                    continue;
863                }
864                Err(error @ Error::Store(store::Error::BlockNotFound)) => {
865                    tracing::trace!(
866                        branch_id = ?branch.id(),
867                        ?error,
868                        "Failed to open root directory"
869                    );
870                    // Some branch root blocks may not have been loaded across the network yet.
871                    // This is safe to ignore.
872                    continue;
873                }
874                Err(error) => {
875                    tracing::error!(
876                        branch_id = ?branch.id(),
877                        ?error,
878                        "Failed to open root directory"
879                    );
880                    return Err(error);
881                }
882            };
883
884            dirs.push(dir);
885        }
886
887        Ok(JointDirectory::new(Some(local_branch), dirs))
888    }
889
890    pub async fn cd<P: AsRef<Utf8Path>>(&self, path: P) -> Result<JointDirectory> {
891        self.root().await?.cd(path).await
892    }
893
894    /// Close all db connections held by this repository. After this function returns, any
895    /// subsequent operation on this repository that requires to access the db returns an error.
896    pub async fn close(&self) -> Result<()> {
897        // Abort and *await* the tasks to make sure that the state they are holding is definitely
898        // dropped before we return from this function.
899        for task in [&self.worker_handle, &self.progress_reporter_handle] {
900            let task = task.lock().unwrap().take();
901            if let Some(task) = task {
902                task.abort();
903                task.await.ok();
904            }
905        }
906
907        self.shared.vault.store().close().await?;
908
909        Ok(())
910    }
911
912    pub async fn debug_print_root(&self) {
913        self.debug_print(DebugPrinter::new()).await
914    }
915
916    pub async fn debug_print(&self, print: DebugPrinter) {
917        print.display(&"Repository");
918
919        let branches = match self.shared.load_branches().await {
920            Ok(branches) => branches,
921            Err(error) => {
922                print.display(&format_args!("failed to load branches: {:?}", error));
923                return;
924            }
925        };
926
927        let writer_id = self.shared.credentials.read().unwrap().writer_id;
928
929        for branch in branches {
930            let print = print.indent();
931            let local = if branch.id() == &writer_id {
932                " (local)"
933            } else {
934                ""
935            };
936            print.display(&format_args!(
937                "Branch ID: {:?}{}, root block ID:{:?}",
938                branch.id(),
939                local,
940                branch.root_block_id().await
941            ));
942            let print = print.indent();
943            print.display(&format_args!(
944                "/, vv: {:?}",
945                branch.version_vector().await.unwrap_or_default()
946            ));
947            branch.debug_print(print.indent()).await;
948        }
949
950        print.display(&"Index");
951        let print = print.indent();
952        self.shared.vault.debug_print(print).await;
953    }
954
955    /// Returns the total number of blocks in this repository. This is useful for diagnostics and
956    /// tests.
957    pub async fn count_blocks(&self) -> Result<u64> {
958        Ok(self.shared.vault.store().count_blocks().await?)
959    }
960
961    fn db(&self) -> &db::Pool {
962        self.shared.vault.store().db()
963    }
964
965    fn update_credentials(&self, credentials: Credentials) {
966        tracing::debug!(
967            parent: self.shared.vault.monitor.span(),
968            access = ?credentials.secrets.access_mode(),
969            writer_id = ?credentials.writer_id,
970            "Repository access mode changed"
971        );
972
973        self.shared
974            .vault
975            .block_tracker
976            .set_request_mode(block_request_mode(&credentials.secrets));
977
978        *self.shared.credentials.write().unwrap() = credentials;
979        *self.worker_handle.lock().unwrap() = Some(spawn_worker(self.shared.clone()));
980    }
981}
982
983pub struct RepositoryHandle {
984    pub(crate) vault: Vault,
985}
986
987struct Shared {
988    vault: Vault,
989    credentials: BlockingRwLock<Credentials>,
990    branch_shared: BranchShared,
991}
992
993impl Shared {
994    fn new(pool: db::Pool, credentials: Credentials, monitor: RepositoryMonitor) -> Self {
995        let event_tx = EventSender::new(EVENT_CHANNEL_CAPACITY);
996        let vault = Vault::new(*credentials.secrets.id(), event_tx, pool, monitor);
997
998        vault
999            .block_tracker
1000            .set_request_mode(block_request_mode(&credentials.secrets));
1001
1002        Self {
1003            vault,
1004            credentials: BlockingRwLock::new(credentials),
1005            branch_shared: BranchShared::new(),
1006        }
1007    }
1008
1009    pub fn local_branch(&self) -> Result<Branch> {
1010        let credentials = self.credentials.read().unwrap();
1011
1012        Ok(self.make_branch(
1013            credentials.writer_id,
1014            credentials.secrets.keys().ok_or(Error::PermissionDenied)?,
1015        ))
1016    }
1017
1018    pub fn get_branch(&self, id: PublicKey) -> Result<Branch> {
1019        let credentials = self.credentials.read().unwrap();
1020        let keys = credentials.secrets.keys().ok_or(Error::PermissionDenied)?;
1021
1022        // Only the local branch is writable.
1023        let keys = if id == credentials.writer_id {
1024            keys
1025        } else {
1026            keys.read_only()
1027        };
1028
1029        Ok(self.make_branch(id, keys))
1030    }
1031
1032    fn make_branch(&self, id: PublicKey, keys: AccessKeys) -> Branch {
1033        Branch::new(
1034            id,
1035            self.vault.store().clone(),
1036            keys,
1037            self.branch_shared.clone(),
1038            self.vault.event_tx.clone(),
1039        )
1040    }
1041
1042    pub async fn load_branches(&self) -> Result<Vec<Branch>> {
1043        self.vault
1044            .store()
1045            .acquire_read()
1046            .await?
1047            .load_latest_approved_root_nodes()
1048            .err_into()
1049            .and_then(|root_node| future::ready(self.get_branch(root_node.proof.writer_id)))
1050            .try_collect()
1051            .await
1052    }
1053}
1054
1055fn spawn_worker(shared: Arc<Shared>) -> ScopedJoinHandle<()> {
1056    let span = shared.vault.monitor.span().clone();
1057    scoped_task::spawn(worker::run(shared).instrument(span))
1058}
1059
1060async fn report_sync_progress(vault: Vault) {
1061    let mut prev_progress = Progress { value: 0, total: 0 };
1062
1063    let events = stream::unfold(vault.event_tx.subscribe(), |mut rx| async move {
1064        match rx.recv().await {
1065            Ok(_) | Err(RecvError::Lagged(_)) => Some(((), rx)),
1066            Err(RecvError::Closed) => None,
1067        }
1068    });
1069    let events = Throttle::new(events, Duration::from_secs(1));
1070    let mut events = pin!(events);
1071
1072    while events.next().await.is_some() {
1073        let next_progress = match vault.store().sync_progress().await {
1074            Ok(progress) => progress,
1075            Err(error) => {
1076                tracing::error!("Failed to retrieve sync progress: {:?}", error);
1077                continue;
1078            }
1079        };
1080
1081        if next_progress != prev_progress {
1082            prev_progress = next_progress;
1083            tracing::debug!(
1084                "Sync progress: {} bytes ({:.1})",
1085                prev_progress * BLOCK_SIZE as u64,
1086                prev_progress.percent()
1087            );
1088        }
1089    }
1090}
1091
1092fn block_request_mode(secrets: &AccessSecrets) -> BlockRequestMode {
1093    if secrets.can_read() {
1094        BlockRequestMode::Lazy
1095    } else {
1096        BlockRequestMode::Greedy
1097    }
1098}