1pub(crate) mod monitor;
2
3mod credentials;
4mod metadata;
5mod params;
6mod vault;
7mod worker;
8
9#[cfg(test)]
10mod tests;
11
12pub use self::{credentials::Credentials, metadata::Metadata, params::RepositoryParams};
13
14pub(crate) use self::{
15 metadata::{data_version, quota},
16 vault::Vault,
17};
18
19use self::monitor::RepositoryMonitor;
20use crate::{
21 access_control::{Access, AccessChange, AccessKeys, AccessMode, AccessSecrets, LocalSecret},
22 block_tracker::BlockRequestMode,
23 branch::{Branch, BranchShared},
24 crypto::{sign::PublicKey, PasswordSalt},
25 db::{self, DatabaseId},
26 debug::DebugPrinter,
27 directory::{Directory, DirectoryFallback, DirectoryLocking, EntryRef, EntryType},
28 error::{Error, Result},
29 event::{Event, EventSender},
30 file::File,
31 joint_directory::{JointDirectory, JointEntryRef, MissingVersionStrategy},
32 path,
33 progress::Progress,
34 protocol::{RootNodeFilter, StorageSize, BLOCK_SIZE},
35 store,
36 sync::stream::Throttle,
37 version_vector::VersionVector,
38};
39use camino::Utf8Path;
40use deadlock::{BlockingMutex, BlockingRwLock};
41use futures_util::{future, TryStreamExt};
42use futures_util::{stream, StreamExt};
43use metrics::{NoopRecorder, Recorder};
44use scoped_task::ScopedJoinHandle;
45use state_monitor::StateMonitor;
46use std::{
47 borrow::Cow,
48 path::{Path, PathBuf},
49 pin::pin,
50 sync::Arc,
51 time::SystemTime,
52};
53use tokio::{
54 sync::broadcast::{self, error::RecvError},
55 time::Duration,
56};
57use tracing::instrument::Instrument;
58
59const EVENT_CHANNEL_CAPACITY: usize = 10000;
60
61pub struct Repository {
62 shared: Arc<Shared>,
63 worker_handle: BlockingMutex<Option<ScopedJoinHandle<()>>>,
64 progress_reporter_handle: BlockingMutex<Option<ScopedJoinHandle<()>>>,
65}
66
67pub fn database_files(store_path: impl AsRef<Path>) -> Vec<PathBuf> {
78 ["", "-wal", "-shm"]
80 .into_iter()
81 .map(|suffix| {
82 let mut path = store_path.as_ref().as_os_str().to_owned();
83 path.push(suffix);
84 path.into()
85 })
86 .collect()
87}
88
89impl Repository {
90 pub async fn create(params: &RepositoryParams<impl Recorder>, access: Access) -> Result<Self> {
92 let pool = params.create().await?;
93 let device_id = params.device_id();
94 let monitor = params.monitor();
95
96 let mut tx = pool.begin_write().await?;
97
98 let local_keys = metadata::initialize_access_secrets(&mut tx, &access).await?;
99 let writer_id =
100 metadata::get_or_generate_writer_id(&mut tx, local_keys.write.as_deref()).await?;
101 metadata::set_device_id(&mut tx, &device_id).await?;
102
103 tx.commit().await?;
104
105 let credentials = Credentials {
106 secrets: access.secrets(),
107 writer_id,
108 };
109
110 Self::new(pool, credentials, monitor).init().await
111 }
112
113 pub async fn open(
115 params: &RepositoryParams<impl Recorder>,
116 local_secret: Option<LocalSecret>,
117 access_mode: AccessMode,
118 ) -> Result<Self> {
119 let pool = params.open().await?;
120 let monitor = params.monitor();
121 let device_id = params.device_id();
122
123 let mut tx = pool.begin_write().await?;
124
125 let (secrets, local_key) =
126 metadata::get_access_secrets(&mut tx, local_secret.as_ref()).await?;
127
128 let secrets = secrets.with_mode(access_mode);
129
130 let writer_id = if metadata::check_device_id(&mut tx, &device_id).await? {
131 if secrets.can_write() {
132 metadata::get_or_generate_writer_id(&mut tx, local_key.as_deref()).await?
133 } else {
134 metadata::generate_writer_id()
135 }
136 } else {
137 let writer_id = metadata::generate_writer_id();
144
145 metadata::set_device_id(&mut tx, &device_id).await?;
146 metadata::set_writer_id(&mut tx, &writer_id, local_key.as_deref()).await?;
147
148 writer_id
149 };
150
151 tx.commit().await?;
152
153 let credentials = Credentials { secrets, writer_id };
154
155 Self::new(pool, credentials, monitor).init().await
156 }
157
158 fn new(pool: db::Pool, credentials: Credentials, monitor: RepositoryMonitor) -> Self {
159 Self {
160 shared: Arc::new(Shared::new(pool, credentials, monitor)),
161 worker_handle: BlockingMutex::new(None),
162 progress_reporter_handle: BlockingMutex::new(None),
163 }
164 }
165
166 async fn init(self) -> Result<Self> {
167 let credentials = self.credentials();
168
169 if let Some(keys) = credentials
170 .secrets
171 .write_secrets()
172 .map(|secrets| &secrets.write_keys)
173 {
174 self.shared
175 .vault
176 .store()
177 .migrate_data(credentials.writer_id, keys)
178 .await?;
179 }
180
181 {
182 let mut conn = self.shared.vault.store().db().acquire().await?;
183 if let Some(block_expiration) = metadata::block_expiration::get(&mut conn).await? {
184 self.shared
185 .vault
186 .set_block_expiration(Some(block_expiration))?;
187 }
188 }
189
190 tracing::debug!(
191 parent: self.shared.vault.monitor.span(),
192 access = ?credentials.secrets.access_mode(),
193 writer_id = ?credentials.writer_id,
194 "Repository opened"
195 );
196
197 *self.worker_handle.lock().unwrap() = Some(spawn_worker(self.shared.clone()));
198
199 *self.progress_reporter_handle.lock().unwrap() = Some(scoped_task::spawn(
200 report_sync_progress(self.shared.vault.clone())
201 .instrument(self.shared.vault.monitor.span().clone()),
202 ));
203
204 Ok(self)
205 }
206
207 pub async fn database_id(&self) -> Result<DatabaseId> {
208 Ok(metadata::get_or_generate_database_id(self.db()).await?)
209 }
210
211 pub async fn requires_local_secret_for_reading(&self) -> Result<bool> {
212 let mut conn = self.db().acquire().await?;
213 Ok(metadata::requires_local_secret_for_reading(&mut conn).await?)
214 }
215
216 pub async fn requires_local_secret_for_writing(&self) -> Result<bool> {
217 let mut conn = self.db().acquire().await?;
218 Ok(metadata::requires_local_secret_for_writing(&mut conn).await?)
219 }
220
221 pub async fn set_access(
241 &self,
242 read_change: Option<AccessChange>,
243 write_change: Option<AccessChange>,
244 ) -> Result<()> {
245 let mut tx = self.db().begin_write().await?;
246
247 if let Some(change) = read_change {
248 self.set_read_access(&mut tx, change).await?;
249 }
250
251 if let Some(change) = write_change {
252 self.set_write_access(&mut tx, change).await?;
253 }
254
255 tx.commit().await?;
256
257 Ok(())
258 }
259
260 async fn set_read_access(
261 &self,
262 tx: &mut db::WriteTransaction,
263 change: AccessChange,
264 ) -> Result<()> {
265 let local = match &change {
266 AccessChange::Enable(Some(local_secret)) => {
267 Some(metadata::secret_to_key_and_salt(local_secret))
268 }
269 AccessChange::Enable(None) => None,
270 AccessChange::Disable => {
271 metadata::remove_read_key(tx).await?;
272 return Ok(());
273 }
274 };
275
276 let (id, read_key) = {
277 let cred = self.shared.credentials.read().unwrap();
278 (
279 *cred.secrets.id(),
280 cred.secrets
281 .read_key()
282 .ok_or(Error::PermissionDenied)?
283 .clone(),
284 )
285 };
286
287 metadata::set_read_key(
288 tx,
289 &id,
290 &read_key,
291 local.as_ref().map(|(k, s)| (k.as_ref(), s.as_ref())),
292 )
293 .await?;
294
295 Ok(())
296 }
297
298 async fn set_write_access(
299 &self,
300 tx: &mut db::WriteTransaction,
301 change: AccessChange,
302 ) -> Result<()> {
303 let local = match &change {
304 AccessChange::Enable(Some(local_secret)) => {
305 Some(metadata::secret_to_key_and_salt(local_secret))
306 }
307 AccessChange::Enable(None) => None,
308 AccessChange::Disable => {
309 metadata::remove_write_key(tx).await?;
310 return Ok(());
311 }
312 };
313
314 let (write_secrets, writer_id) = {
315 let cred = self.shared.credentials.read().unwrap();
316 (
317 cred.secrets
318 .write_secrets()
319 .ok_or(Error::PermissionDenied)?
320 .clone(),
321 cred.writer_id,
322 )
323 };
324
325 metadata::set_write_key(
326 tx,
327 &write_secrets,
328 local.as_ref().map(|(k, s)| (k.as_ref(), s.as_ref())),
329 )
330 .await?;
331 metadata::set_writer_id(tx, &writer_id, local.as_ref().map(|(k, _)| k.as_ref())).await?;
332
333 Ok(())
334 }
335
336 pub fn credentials(&self) -> Credentials {
340 self.shared.credentials.read().unwrap().clone()
341 }
342
343 pub fn secrets(&self) -> AccessSecrets {
344 self.shared.credentials.read().unwrap().secrets.clone()
345 }
346
347 pub fn access_mode(&self) -> AccessMode {
349 self.shared
350 .credentials
351 .read()
352 .unwrap()
353 .secrets
354 .access_mode()
355 }
356
357 pub async fn set_access_mode(
363 &self,
364 access_mode: AccessMode,
365 local_secret: Option<LocalSecret>,
366 ) -> Result<()> {
367 let old_secrets = {
370 let creds = self.shared.credentials.read().unwrap();
371
372 if creds.secrets.access_mode() == access_mode {
373 return Ok(());
374 }
375
376 creds.secrets.clone()
377 };
378
379 let mut tx = self.db().begin_write().await?;
380
381 let (secrets, local_key) = if old_secrets.access_mode() >= access_mode {
382 (old_secrets, None)
383 } else {
384 let (new_secrets, local_key) =
385 metadata::get_access_secrets(&mut tx, local_secret.as_ref()).await?;
386
387 if new_secrets.access_mode() > old_secrets.access_mode() {
388 (new_secrets, local_key)
389 } else {
390 (old_secrets, None)
391 }
392 };
393
394 let secrets = secrets.with_mode(access_mode);
395
396 let writer_id = if secrets.can_write() {
397 metadata::get_or_generate_writer_id(&mut tx, local_key.as_deref()).await?
398 } else {
399 metadata::generate_writer_id()
400 };
401
402 tx.commit().await?;
403
404 if let Some(write_keys) = secrets.write_secrets().map(|secrets| &secrets.write_keys) {
407 self.shared
408 .vault
409 .store()
410 .migrate_data(writer_id, write_keys)
411 .await?;
412 }
413
414 self.update_credentials(Credentials { secrets, writer_id });
415
416 Ok(())
417 }
418
419 pub async fn set_credentials(&self, credentials: Credentials) -> Result<()> {
440 let expected_id = {
442 let mut conn = self.db().acquire().await?;
443 metadata::get_repository_id(&mut conn).await?
444 };
445
446 if credentials.secrets.id() != &expected_id {
447 return Err(Error::PermissionDenied);
448 }
449
450 if let Some(write_secrets) = credentials.secrets.write_secrets() {
451 self.shared
452 .vault
453 .store()
454 .migrate_data(credentials.writer_id, &write_secrets.write_keys)
455 .await?;
456 }
457
458 self.update_credentials(credentials);
459
460 Ok(())
461 }
462
463 pub async fn unlock_secrets(&self, local_secret: LocalSecret) -> Result<AccessSecrets> {
464 let mut tx = self.db().begin_write().await?;
465 Ok(metadata::get_access_secrets(&mut tx, Some(&local_secret))
466 .await?
467 .0)
468 }
469
470 pub fn metadata(&self) -> Metadata {
473 self.shared.vault.metadata()
474 }
475
476 pub async fn set_quota(&self, quota: Option<StorageSize>) -> Result<()> {
478 self.shared.vault.set_quota(quota).await
479 }
480
481 pub async fn quota(&self) -> Result<Option<StorageSize>> {
483 self.shared.vault.quota().await
484 }
485
486 pub async fn set_block_expiration(&self, block_expiration: Option<Duration>) -> Result<()> {
489 let mut tx = self.db().begin_write().await?;
490 metadata::block_expiration::set(&mut tx, block_expiration).await?;
491 tx.commit().await?;
492
493 self.shared.vault.set_block_expiration(block_expiration)
494 }
495
496 pub fn block_expiration(&self) -> Option<Duration> {
498 self.shared.vault.block_expiration()
499 }
500
501 pub fn last_block_expiration_time(&self) -> Option<SystemTime> {
504 self.shared.vault.last_block_expiration_time()
505 }
506
507 pub async fn size(&self) -> Result<StorageSize> {
509 self.shared.vault.size().await
510 }
511
512 pub fn handle(&self) -> RepositoryHandle {
513 RepositoryHandle {
514 vault: self.shared.vault.clone(),
515 }
516 }
517
518 pub async fn get_read_password_salt(&self) -> Result<PasswordSalt> {
519 let mut tx = self.db().begin_write().await?;
520 Ok(metadata::get_password_salt(&mut tx, metadata::KeyType::Read).await?)
521 }
522
523 pub async fn get_write_password_salt(&self) -> Result<PasswordSalt> {
524 let mut tx = self.db().begin_write().await?;
525 Ok(metadata::get_password_salt(&mut tx, metadata::KeyType::Write).await?)
526 }
527
528 pub fn monitor(&self) -> &StateMonitor {
530 self.shared.vault.monitor.node()
531 }
532
533 pub async fn export(&self, dst: &Path) -> Result<()> {
538 struct Cleanup<'a> {
541 path: &'a Path,
542 armed: bool,
543 }
544
545 impl Drop for Cleanup<'_> {
546 fn drop(&mut self) {
547 if !self.armed {
548 return;
549 }
550
551 if let Err(error) = std::fs::remove_file(self.path) {
552 tracing::error!(
553 path = ?self.path,
554 ?error,
555 "failed to delete partially exported repository",
556 );
557 }
558 }
559 }
560
561 let mut cleanup = Cleanup {
562 path: dst,
563 armed: true,
564 };
565
566 self.shared.vault.store().export(dst).await?;
568
569 let pool = db::open(dst).await?;
571 let credentials = self.credentials().with_mode(AccessMode::Read);
572 let access_mode = credentials.secrets.access_mode();
573 let monitor = RepositoryMonitor::new(StateMonitor::make_root(), &NoopRecorder);
574 let repo = Self::new(pool, credentials, monitor);
575
576 match access_mode {
577 AccessMode::Blind => {
578 repo.set_access(Some(AccessChange::Disable), Some(AccessChange::Disable))
579 .await?
580 }
581 AccessMode::Read => {
582 repo.set_access(
583 Some(AccessChange::Enable(None)),
584 Some(AccessChange::Disable),
585 )
586 .await?
587 }
588 AccessMode::Write => unreachable!(),
589 }
590
591 repo.close().await?;
592
593 cleanup.armed = false;
594
595 Ok(())
596 }
597
598 pub async fn lookup_type<P: AsRef<Utf8Path>>(&self, path: P) -> Result<EntryType> {
601 match path::decompose(path.as_ref()) {
602 Some((parent, name)) => {
603 let parent = self.open_directory(parent).await?;
604 Ok(parent.lookup_unique(name)?.entry_type())
605 }
606 None => Ok(EntryType::Directory),
607 }
608 }
609
610 pub async fn open_file<P: AsRef<Utf8Path>>(&self, path: P) -> Result<File> {
612 let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::EntryIsDirectory)?;
613
614 self.cd(parent)
615 .await?
616 .lookup_unique(name)?
617 .file()?
618 .open()
619 .await
620 }
621
622 pub async fn open_file_version<P: AsRef<Utf8Path>>(
624 &self,
625 path: P,
626 branch_id: &PublicKey,
627 ) -> Result<File> {
628 let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::EntryIsDirectory)?;
629
630 self.cd(parent)
631 .await?
632 .lookup_version(name, branch_id)?
633 .open()
634 .await
635 }
636
637 pub async fn open_directory<P: AsRef<Utf8Path>>(&self, path: P) -> Result<JointDirectory> {
639 self.cd(path).await
640 }
641
642 pub async fn create_file<P: AsRef<Utf8Path>>(&self, path: P) -> Result<File> {
644 let file = self
645 .local_branch()?
646 .ensure_file_exists(path.as_ref())
647 .await?;
648
649 Ok(file)
650 }
651
652 pub async fn create_directory<P: AsRef<Utf8Path>>(&self, path: P) -> Result<Directory> {
654 let dir = self
655 .local_branch()?
656 .ensure_directory_exists(path.as_ref())
657 .await?;
658
659 Ok(dir)
660 }
661
662 pub async fn remove_entry<P: AsRef<Utf8Path>>(&self, path: P) -> Result<()> {
664 let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::OperationNotSupported)?;
665 let mut parent = self.cd(parent).await?;
666 parent.remove_entry(name).await?;
667
668 Ok(())
669 }
670
671 pub async fn remove_entry_recursively<P: AsRef<Utf8Path>>(&self, path: P) -> Result<()> {
673 let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::OperationNotSupported)?;
674 let mut parent = self.cd(parent).await?;
675 parent.remove_entry_recursively(name).await?;
676
677 Ok(())
678 }
679
680 pub async fn move_entry<S: AsRef<Utf8Path>, D: AsRef<Utf8Path>>(
683 &self,
684 src: S,
685 dst: D,
686 ) -> Result<()> {
687 let (src_dir_path, src_name) = path::decompose(src.as_ref()).ok_or(Error::EntryNotFound)?;
688 let (dst_dir_path, dst_name) = path::decompose(dst.as_ref()).ok_or(Error::EntryNotFound)?;
689
690 let local_branch = self.local_branch()?;
691 let src_joint_dir = self.cd(src_dir_path).await?;
692
693 let (mut src_dir, src_name, src_type) = match src_joint_dir.lookup_unique(src_name)? {
695 JointEntryRef::File(entry) => {
696 let src_name = entry.name().to_string();
697
698 let mut file = entry.open().await?;
699 file.fork(local_branch.clone()).await?;
700
701 (file.parent().await?, Cow::Owned(src_name), EntryType::File)
702 }
703 JointEntryRef::Directory(entry) => {
704 let mut dir_to_move = entry
705 .open_with(MissingVersionStrategy::Skip, DirectoryFallback::Disabled)
706 .await?;
707 let dir_to_move = dir_to_move.merge().await?;
708
709 let src_dir = dir_to_move
710 .parent()
711 .await?
712 .ok_or(Error::OperationNotSupported )?;
713
714 (src_dir, Cow::Borrowed(src_name), EntryType::Directory)
715 }
716 };
717
718 let src_entry = src_dir.lookup(&src_name)?.clone_data();
719
720 let mut dst_joint_dir = self.cd(&dst_dir_path).await?;
721 let dst_dir = dst_joint_dir
722 .local_version_mut()
723 .ok_or(Error::PermissionDenied)?;
724
725 let dst_old_entry = dst_dir.lookup(dst_name);
726
727 let dst_old_vv = match (src_type, dst_old_entry) {
730 (EntryType::File | EntryType::Directory, Ok(EntryRef::Tombstone(old_entry))) => {
731 old_entry.version_vector().clone()
732 }
733 (EntryType::File | EntryType::Directory, Err(Error::EntryNotFound)) => {
734 VersionVector::new()
735 }
736 (EntryType::File | EntryType::Directory, Err(error)) => return Err(error),
737 (EntryType::File, Ok(EntryRef::File(old_entry))) => old_entry.version_vector().clone(),
738 (EntryType::Directory, Ok(EntryRef::Directory(old_entry))) => {
739 if old_entry
740 .open(DirectoryFallback::Disabled)
741 .await?
742 .entries()
743 .all(|entry| entry.is_tombstone())
744 {
745 old_entry.version_vector().clone()
746 } else {
747 return Err(Error::DirectoryNotEmpty);
748 }
749 }
750 (EntryType::File, Ok(EntryRef::Directory(_))) => return Err(Error::EntryIsDirectory),
751 (EntryType::Directory, Ok(EntryRef::File(_))) => return Err(Error::EntryIsFile),
752 };
753
754 let dst_vv = dst_old_vv
755 .merged(src_entry.version_vector())
756 .incremented(*local_branch.id());
757
758 src_dir
759 .move_entry(&src_name, src_entry, dst_dir, dst_name, dst_vv)
760 .await?;
761
762 Ok(())
763 }
764
765 pub fn local_branch(&self) -> Result<Branch> {
768 self.shared.local_branch()
769 }
770
771 #[cfg(test)]
774 pub fn get_branch(&self, id: PublicKey) -> Result<Branch> {
775 self.shared.get_branch(id)
776 }
777
778 pub async fn get_branch_version_vector(&self, writer_id: &PublicKey) -> Result<VersionVector> {
780 Ok(self
781 .shared
782 .vault
783 .store()
784 .acquire_read()
785 .await?
786 .load_latest_approved_root_node(writer_id, RootNodeFilter::Any)
787 .await?
788 .proof
789 .into_version_vector())
790 }
791
792 pub async fn get_merged_version_vector(&self) -> Result<VersionVector> {
794 Ok(self
795 .shared
796 .vault
797 .store()
798 .acquire_read()
799 .await?
800 .load_latest_approved_root_nodes()
801 .try_fold(VersionVector::default(), |mut merged, node| {
802 merged.merge(&node.proof.version_vector);
803 future::ready(Ok(merged))
804 })
805 .await?)
806 }
807
808 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
810 self.shared.vault.event_tx.subscribe()
811 }
812
813 pub async fn sync_progress(&self) -> Result<Progress> {
816 Ok(self.shared.vault.store().sync_progress().await?)
817 }
818
819 pub async fn check_integrity(&self) -> Result<bool> {
822 Ok(self.shared.vault.store().check_integrity().await?)
823 }
824
825 async fn root(&self) -> Result<JointDirectory> {
827 let local_branch = self.local_branch()?;
828 let branches = self.shared.load_branches().await?;
829
830 let branches = if local_branch.keys().write().is_some()
835 && branches
836 .iter()
837 .all(|branch| branch.id() != local_branch.id())
838 {
839 let mut branches = branches;
840 branches.push(local_branch.clone());
841 branches
842 } else {
843 branches
844 };
845
846 let mut dirs = Vec::new();
847
848 for branch in branches {
849 let dir = match branch
850 .open_root(DirectoryLocking::Enabled, DirectoryFallback::Enabled)
851 .await
852 {
853 Ok(dir) => dir,
854 Err(error @ Error::Store(store::Error::BranchNotFound)) => {
855 tracing::trace!(
856 branch_id = ?branch.id(),
857 ?error,
858 "Failed to open root directory"
859 );
860 continue;
863 }
864 Err(error @ Error::Store(store::Error::BlockNotFound)) => {
865 tracing::trace!(
866 branch_id = ?branch.id(),
867 ?error,
868 "Failed to open root directory"
869 );
870 continue;
873 }
874 Err(error) => {
875 tracing::error!(
876 branch_id = ?branch.id(),
877 ?error,
878 "Failed to open root directory"
879 );
880 return Err(error);
881 }
882 };
883
884 dirs.push(dir);
885 }
886
887 Ok(JointDirectory::new(Some(local_branch), dirs))
888 }
889
890 pub async fn cd<P: AsRef<Utf8Path>>(&self, path: P) -> Result<JointDirectory> {
891 self.root().await?.cd(path).await
892 }
893
894 pub async fn close(&self) -> Result<()> {
897 for task in [&self.worker_handle, &self.progress_reporter_handle] {
900 let task = task.lock().unwrap().take();
901 if let Some(task) = task {
902 task.abort();
903 task.await.ok();
904 }
905 }
906
907 self.shared.vault.store().close().await?;
908
909 Ok(())
910 }
911
912 pub async fn debug_print_root(&self) {
913 self.debug_print(DebugPrinter::new()).await
914 }
915
916 pub async fn debug_print(&self, print: DebugPrinter) {
917 print.display(&"Repository");
918
919 let branches = match self.shared.load_branches().await {
920 Ok(branches) => branches,
921 Err(error) => {
922 print.display(&format_args!("failed to load branches: {:?}", error));
923 return;
924 }
925 };
926
927 let writer_id = self.shared.credentials.read().unwrap().writer_id;
928
929 for branch in branches {
930 let print = print.indent();
931 let local = if branch.id() == &writer_id {
932 " (local)"
933 } else {
934 ""
935 };
936 print.display(&format_args!(
937 "Branch ID: {:?}{}, root block ID:{:?}",
938 branch.id(),
939 local,
940 branch.root_block_id().await
941 ));
942 let print = print.indent();
943 print.display(&format_args!(
944 "/, vv: {:?}",
945 branch.version_vector().await.unwrap_or_default()
946 ));
947 branch.debug_print(print.indent()).await;
948 }
949
950 print.display(&"Index");
951 let print = print.indent();
952 self.shared.vault.debug_print(print).await;
953 }
954
955 pub async fn count_blocks(&self) -> Result<u64> {
958 Ok(self.shared.vault.store().count_blocks().await?)
959 }
960
961 fn db(&self) -> &db::Pool {
962 self.shared.vault.store().db()
963 }
964
965 fn update_credentials(&self, credentials: Credentials) {
966 tracing::debug!(
967 parent: self.shared.vault.monitor.span(),
968 access = ?credentials.secrets.access_mode(),
969 writer_id = ?credentials.writer_id,
970 "Repository access mode changed"
971 );
972
973 self.shared
974 .vault
975 .block_tracker
976 .set_request_mode(block_request_mode(&credentials.secrets));
977
978 *self.shared.credentials.write().unwrap() = credentials;
979 *self.worker_handle.lock().unwrap() = Some(spawn_worker(self.shared.clone()));
980 }
981}
982
983pub struct RepositoryHandle {
984 pub(crate) vault: Vault,
985}
986
987struct Shared {
988 vault: Vault,
989 credentials: BlockingRwLock<Credentials>,
990 branch_shared: BranchShared,
991}
992
993impl Shared {
994 fn new(pool: db::Pool, credentials: Credentials, monitor: RepositoryMonitor) -> Self {
995 let event_tx = EventSender::new(EVENT_CHANNEL_CAPACITY);
996 let vault = Vault::new(*credentials.secrets.id(), event_tx, pool, monitor);
997
998 vault
999 .block_tracker
1000 .set_request_mode(block_request_mode(&credentials.secrets));
1001
1002 Self {
1003 vault,
1004 credentials: BlockingRwLock::new(credentials),
1005 branch_shared: BranchShared::new(),
1006 }
1007 }
1008
1009 pub fn local_branch(&self) -> Result<Branch> {
1010 let credentials = self.credentials.read().unwrap();
1011
1012 Ok(self.make_branch(
1013 credentials.writer_id,
1014 credentials.secrets.keys().ok_or(Error::PermissionDenied)?,
1015 ))
1016 }
1017
1018 pub fn get_branch(&self, id: PublicKey) -> Result<Branch> {
1019 let credentials = self.credentials.read().unwrap();
1020 let keys = credentials.secrets.keys().ok_or(Error::PermissionDenied)?;
1021
1022 let keys = if id == credentials.writer_id {
1024 keys
1025 } else {
1026 keys.read_only()
1027 };
1028
1029 Ok(self.make_branch(id, keys))
1030 }
1031
1032 fn make_branch(&self, id: PublicKey, keys: AccessKeys) -> Branch {
1033 Branch::new(
1034 id,
1035 self.vault.store().clone(),
1036 keys,
1037 self.branch_shared.clone(),
1038 self.vault.event_tx.clone(),
1039 )
1040 }
1041
1042 pub async fn load_branches(&self) -> Result<Vec<Branch>> {
1043 self.vault
1044 .store()
1045 .acquire_read()
1046 .await?
1047 .load_latest_approved_root_nodes()
1048 .err_into()
1049 .and_then(|root_node| future::ready(self.get_branch(root_node.proof.writer_id)))
1050 .try_collect()
1051 .await
1052 }
1053}
1054
1055fn spawn_worker(shared: Arc<Shared>) -> ScopedJoinHandle<()> {
1056 let span = shared.vault.monitor.span().clone();
1057 scoped_task::spawn(worker::run(shared).instrument(span))
1058}
1059
1060async fn report_sync_progress(vault: Vault) {
1061 let mut prev_progress = Progress { value: 0, total: 0 };
1062
1063 let events = stream::unfold(vault.event_tx.subscribe(), |mut rx| async move {
1064 match rx.recv().await {
1065 Ok(_) | Err(RecvError::Lagged(_)) => Some(((), rx)),
1066 Err(RecvError::Closed) => None,
1067 }
1068 });
1069 let events = Throttle::new(events, Duration::from_secs(1));
1070 let mut events = pin!(events);
1071
1072 while events.next().await.is_some() {
1073 let next_progress = match vault.store().sync_progress().await {
1074 Ok(progress) => progress,
1075 Err(error) => {
1076 tracing::error!("Failed to retrieve sync progress: {:?}", error);
1077 continue;
1078 }
1079 };
1080
1081 if next_progress != prev_progress {
1082 prev_progress = next_progress;
1083 tracing::debug!(
1084 "Sync progress: {} bytes ({:.1})",
1085 prev_progress * BLOCK_SIZE as u64,
1086 prev_progress.percent()
1087 );
1088 }
1089 }
1090}
1091
1092fn block_request_mode(secrets: &AccessSecrets) -> BlockRequestMode {
1093 if secrets.can_read() {
1094 BlockRequestMode::Lazy
1095 } else {
1096 BlockRequestMode::Greedy
1097 }
1098}