1pub(crate) mod monitor;
2
3mod credentials;
4mod metadata;
5mod params;
6mod vault;
7mod worker;
8
9#[cfg(test)]
10mod tests;
11
12pub use self::{credentials::Credentials, metadata::Metadata, params::RepositoryParams};
13
14pub(crate) use self::{
15 metadata::{data_version, quota},
16 vault::Vault,
17};
18
19use self::monitor::RepositoryMonitor;
20use crate::{
21 access_control::{Access, AccessChange, AccessKeys, AccessMode, AccessSecrets, LocalSecret},
22 block_tracker::BlockRequestMode,
23 branch::{Branch, BranchShared},
24 crypto::{PasswordSalt, sign::PublicKey},
25 db::{self, DatabaseId},
26 debug::DebugPrinter,
27 directory::{Directory, DirectoryFallback, DirectoryLocking, EntryRef, EntryType},
28 error::{Error, Result},
29 event::{Event, EventSender},
30 file::File,
31 joint_directory::{JointDirectory, JointEntryRef, MissingVersionStrategy},
32 path,
33 progress::Progress,
34 protocol::{BLOCK_SIZE, RootNodeFilter, StorageSize},
35 store,
36 sync::stream::Throttle,
37 version_vector::VersionVector,
38};
39use camino::Utf8Path;
40use deadlock::{BlockingMutex, BlockingRwLock};
41use futures_util::{StreamExt, stream};
42use futures_util::{TryStreamExt, future};
43use metrics::{NoopRecorder, Recorder};
44use scoped_task::ScopedJoinHandle;
45use state_monitor::StateMonitor;
46use std::{
47 borrow::Cow,
48 path::{Path, PathBuf},
49 pin::pin,
50 sync::Arc,
51 time::SystemTime,
52};
53use tokio::{
54 sync::broadcast::{self, error::RecvError},
55 time::Duration,
56};
57use tracing::instrument::Instrument;
58
59const EVENT_CHANNEL_CAPACITY: usize = 10000;
60
61pub struct Repository {
62 shared: Arc<Shared>,
63 worker_handle: BlockingMutex<Option<ScopedJoinHandle<()>>>,
64 progress_reporter_handle: BlockingMutex<Option<ScopedJoinHandle<()>>>,
65}
66
67pub fn database_files(store_path: impl AsRef<Path>) -> Vec<PathBuf> {
78 ["", "-wal", "-shm"]
80 .into_iter()
81 .map(|suffix| {
82 let mut path = store_path.as_ref().as_os_str().to_owned();
83 path.push(suffix);
84 path.into()
85 })
86 .collect()
87}
88
89impl Repository {
90 pub async fn create(params: &RepositoryParams<impl Recorder>, access: Access) -> Result<Self> {
92 let pool = params.create().await?;
93 let device_id = params.device_id();
94 let monitor = params.monitor();
95
96 let mut tx = pool.begin_write().await?;
97
98 let local_keys = metadata::initialize_access_secrets(&mut tx, &access).await?;
99 let writer_id =
100 metadata::get_or_generate_writer_id(&mut tx, local_keys.write.as_deref()).await?;
101 metadata::set_device_id(&mut tx, &device_id).await?;
102
103 tx.commit().await?;
104
105 let credentials = Credentials {
106 secrets: access.secrets(),
107 writer_id,
108 };
109
110 Self::new(pool, credentials, monitor).init().await
111 }
112
113 pub async fn open(
115 params: &RepositoryParams<impl Recorder>,
116 local_secret: Option<LocalSecret>,
117 access_mode: AccessMode,
118 ) -> Result<Self> {
119 let pool = params.open().await?;
120 let monitor = params.monitor();
121 let device_id = params.device_id();
122
123 let mut tx = pool.begin_write().await?;
124
125 let (secrets, local_key) =
126 metadata::get_access_secrets(&mut tx, local_secret.as_ref()).await?;
127
128 let secrets = secrets.with_mode(access_mode);
129
130 let writer_id = if metadata::check_device_id(&mut tx, &device_id).await? {
131 if secrets.can_write() {
132 metadata::get_or_generate_writer_id(&mut tx, local_key.as_deref()).await?
133 } else {
134 metadata::generate_writer_id()
135 }
136 } else {
137 let writer_id = metadata::generate_writer_id();
144
145 metadata::set_device_id(&mut tx, &device_id).await?;
146 metadata::set_writer_id(&mut tx, &writer_id, local_key.as_deref()).await?;
147
148 writer_id
149 };
150
151 tx.commit().await?;
152
153 let credentials = Credentials { secrets, writer_id };
154
155 Self::new(pool, credentials, monitor).init().await
156 }
157
158 fn new(pool: db::Pool, credentials: Credentials, monitor: RepositoryMonitor) -> Self {
159 Self {
160 shared: Arc::new(Shared::new(pool, credentials, monitor)),
161 worker_handle: BlockingMutex::new(None),
162 progress_reporter_handle: BlockingMutex::new(None),
163 }
164 }
165
166 async fn init(self) -> Result<Self> {
167 let credentials = self.credentials();
168
169 if let Some(keys) = credentials
170 .secrets
171 .write_secrets()
172 .map(|secrets| &secrets.write_keys)
173 {
174 self.shared
175 .vault
176 .store()
177 .migrate_data(credentials.writer_id, keys)
178 .await?;
179 }
180
181 {
182 let mut conn = self.shared.vault.store().db().acquire().await?;
183 if let Some(block_expiration) = metadata::block_expiration::get(&mut conn).await? {
184 self.shared
185 .vault
186 .set_block_expiration(Some(block_expiration))?;
187 }
188 }
189
190 tracing::debug!(
191 parent: self.shared.vault.monitor.span(),
192 access = ?credentials.secrets.access_mode(),
193 writer_id = ?credentials.writer_id,
194 "Repository opened"
195 );
196
197 *self.worker_handle.lock().unwrap() = Some(spawn_worker(self.shared.clone()));
198
199 *self.progress_reporter_handle.lock().unwrap() = Some(scoped_task::spawn(
200 report_sync_progress(self.shared.vault.clone())
201 .instrument(self.shared.vault.monitor.span().clone()),
202 ));
203
204 Ok(self)
205 }
206
207 pub async fn database_id(&self) -> Result<DatabaseId> {
208 Ok(metadata::get_or_generate_database_id(self.db()).await?)
209 }
210
211 pub async fn requires_local_secret_for_reading(&self) -> Result<bool> {
212 let mut conn = self.db().acquire().await?;
213 Ok(metadata::requires_local_secret_for_reading(&mut conn).await?)
214 }
215
216 pub async fn requires_local_secret_for_writing(&self) -> Result<bool> {
217 let mut conn = self.db().acquire().await?;
218 Ok(metadata::requires_local_secret_for_writing(&mut conn).await?)
219 }
220
221 pub async fn set_access(
241 &self,
242 read_change: Option<AccessChange>,
243 write_change: Option<AccessChange>,
244 ) -> Result<()> {
245 let mut tx = self.db().begin_write().await?;
246
247 if let Some(change) = read_change {
248 self.set_read_access(&mut tx, change).await?;
249 }
250
251 if let Some(change) = write_change {
252 self.set_write_access(&mut tx, change).await?;
253 }
254
255 tx.commit().await?;
256
257 Ok(())
258 }
259
260 async fn set_read_access(
261 &self,
262 tx: &mut db::WriteTransaction,
263 change: AccessChange,
264 ) -> Result<()> {
265 let local = match &change {
266 AccessChange::Enable(Some(local_secret)) => {
267 Some(metadata::secret_to_key_and_salt(local_secret))
268 }
269 AccessChange::Enable(None) => None,
270 AccessChange::Disable => {
271 metadata::remove_read_key(tx).await?;
272 return Ok(());
273 }
274 };
275
276 let (id, read_key) = {
277 let cred = self.shared.credentials.read().unwrap();
278 (
279 *cred.secrets.id(),
280 cred.secrets
281 .read_key()
282 .ok_or(Error::PermissionDenied)?
283 .clone(),
284 )
285 };
286
287 metadata::set_read_key(
288 tx,
289 &id,
290 &read_key,
291 local.as_ref().map(|(k, s)| (k.as_ref(), s.as_ref())),
292 )
293 .await?;
294
295 Ok(())
296 }
297
298 async fn set_write_access(
299 &self,
300 tx: &mut db::WriteTransaction,
301 change: AccessChange,
302 ) -> Result<()> {
303 let local = match &change {
304 AccessChange::Enable(Some(local_secret)) => {
305 Some(metadata::secret_to_key_and_salt(local_secret))
306 }
307 AccessChange::Enable(None) => None,
308 AccessChange::Disable => {
309 metadata::remove_write_key(tx).await?;
310 return Ok(());
311 }
312 };
313
314 let (write_secrets, writer_id) = {
315 let cred = self.shared.credentials.read().unwrap();
316 (
317 cred.secrets
318 .write_secrets()
319 .ok_or(Error::PermissionDenied)?
320 .clone(),
321 cred.writer_id,
322 )
323 };
324
325 metadata::set_write_key(
326 tx,
327 &write_secrets,
328 local.as_ref().map(|(k, s)| (k.as_ref(), s.as_ref())),
329 )
330 .await?;
331 metadata::set_writer_id(tx, &writer_id, local.as_ref().map(|(k, _)| k.as_ref())).await?;
332
333 Ok(())
334 }
335
336 pub fn credentials(&self) -> Credentials {
340 self.shared.credentials.read().unwrap().clone()
341 }
342
343 pub fn secrets(&self) -> AccessSecrets {
344 self.shared.credentials.read().unwrap().secrets.clone()
345 }
346
347 pub fn access_mode(&self) -> AccessMode {
349 self.shared
350 .credentials
351 .read()
352 .unwrap()
353 .secrets
354 .access_mode()
355 }
356
357 pub async fn set_access_mode(
363 &self,
364 access_mode: AccessMode,
365 local_secret: Option<LocalSecret>,
366 ) -> Result<()> {
367 let old_secrets = {
370 let creds = self.shared.credentials.read().unwrap();
371
372 if creds.secrets.access_mode() == access_mode {
373 return Ok(());
374 }
375
376 creds.secrets.clone()
377 };
378
379 let mut tx = self.db().begin_write().await?;
380
381 let (secrets, local_key) = if old_secrets.access_mode() >= access_mode {
382 (old_secrets, None)
383 } else {
384 let (new_secrets, local_key) =
385 metadata::get_access_secrets(&mut tx, local_secret.as_ref()).await?;
386
387 if new_secrets.access_mode() > old_secrets.access_mode() {
388 (new_secrets, local_key)
389 } else {
390 (old_secrets, None)
391 }
392 };
393
394 let secrets = secrets.with_mode(access_mode);
395
396 let writer_id = if secrets.can_write() {
397 metadata::get_or_generate_writer_id(&mut tx, local_key.as_deref()).await?
398 } else {
399 metadata::generate_writer_id()
400 };
401
402 tx.commit().await?;
403
404 if let Some(write_keys) = secrets.write_secrets().map(|secrets| &secrets.write_keys) {
407 self.shared
408 .vault
409 .store()
410 .migrate_data(writer_id, write_keys)
411 .await?;
412 }
413
414 self.update_credentials(Credentials { secrets, writer_id });
415
416 Ok(())
417 }
418
419 pub async fn set_credentials(&self, credentials: Credentials) -> Result<()> {
440 let expected_id = {
442 let mut conn = self.db().acquire().await?;
443 metadata::get_repository_id(&mut conn).await?
444 };
445
446 if credentials.secrets.id() != &expected_id {
447 return Err(Error::PermissionDenied);
448 }
449
450 if let Some(write_secrets) = credentials.secrets.write_secrets() {
451 self.shared
452 .vault
453 .store()
454 .migrate_data(credentials.writer_id, &write_secrets.write_keys)
455 .await?;
456 }
457
458 self.update_credentials(credentials);
459
460 Ok(())
461 }
462
463 pub async fn unlock_secrets(&self, local_secret: LocalSecret) -> Result<AccessSecrets> {
464 let mut tx = self.db().begin_write().await?;
465 Ok(metadata::get_access_secrets(&mut tx, Some(&local_secret))
466 .await?
467 .0)
468 }
469
470 pub fn metadata(&self) -> Metadata {
473 self.shared.vault.metadata()
474 }
475
476 pub async fn set_quota(&self, quota: Option<StorageSize>) -> Result<()> {
478 self.shared.vault.set_quota(quota).await
479 }
480
481 pub async fn quota(&self) -> Result<Option<StorageSize>> {
483 self.shared.vault.quota().await
484 }
485
486 pub async fn set_block_expiration(&self, block_expiration: Option<Duration>) -> Result<()> {
489 let mut tx = self.db().begin_write().await?;
490 metadata::block_expiration::set(&mut tx, block_expiration).await?;
491 tx.commit().await?;
492
493 self.shared.vault.set_block_expiration(block_expiration)
494 }
495
496 pub fn block_expiration(&self) -> Option<Duration> {
498 self.shared.vault.block_expiration()
499 }
500
501 pub fn last_block_expiration_time(&self) -> Option<SystemTime> {
504 self.shared.vault.last_block_expiration_time()
505 }
506
507 pub async fn size(&self) -> Result<StorageSize> {
509 self.shared.vault.size().await
510 }
511
512 pub fn handle(&self) -> RepositoryHandle {
513 RepositoryHandle {
514 vault: self.shared.vault.clone(),
515 }
516 }
517
518 pub async fn get_read_password_salt(&self) -> Result<PasswordSalt> {
519 let mut tx = self.db().begin_write().await?;
520 Ok(metadata::get_password_salt(&mut tx, metadata::KeyType::Read).await?)
521 }
522
523 pub async fn get_write_password_salt(&self) -> Result<PasswordSalt> {
524 let mut tx = self.db().begin_write().await?;
525 Ok(metadata::get_password_salt(&mut tx, metadata::KeyType::Write).await?)
526 }
527
528 pub fn monitor(&self) -> &StateMonitor {
530 self.shared.vault.monitor.node()
531 }
532
533 pub async fn export(&self, dst: &Path) -> Result<()> {
538 struct Cleanup<'a> {
541 path: &'a Path,
542 armed: bool,
543 }
544
545 impl Drop for Cleanup<'_> {
546 fn drop(&mut self) {
547 if !self.armed {
548 return;
549 }
550
551 if let Err(error) = std::fs::remove_file(self.path) {
552 tracing::error!(
553 path = ?self.path,
554 ?error,
555 "failed to delete partially exported repository",
556 );
557 }
558 }
559 }
560
561 let mut cleanup = Cleanup {
562 path: dst,
563 armed: true,
564 };
565
566 self.shared.vault.store().export(dst).await?;
568
569 let pool = db::open(dst).await?;
571 let credentials = self.credentials().with_mode(AccessMode::Read);
572 let access_mode = credentials.secrets.access_mode();
573 let monitor = RepositoryMonitor::new(StateMonitor::make_root(), &NoopRecorder);
574 let repo = Self::new(pool, credentials, monitor);
575
576 match access_mode {
577 AccessMode::Blind => {
578 repo.set_access(Some(AccessChange::Disable), Some(AccessChange::Disable))
579 .await?
580 }
581 AccessMode::Read => {
582 repo.set_access(
583 Some(AccessChange::Enable(None)),
584 Some(AccessChange::Disable),
585 )
586 .await?
587 }
588 AccessMode::Write => unreachable!(),
589 }
590
591 repo.close().await?;
592
593 cleanup.armed = false;
594
595 Ok(())
596 }
597
598 pub async fn lookup_type<P: AsRef<Utf8Path>>(&self, path: P) -> Result<EntryType> {
601 match path::decompose(path.as_ref()) {
602 Some((parent, name)) => {
603 let parent = self.open_directory(parent).await?;
604 Ok(parent.lookup_unique(name)?.entry_type())
605 }
606 None => Ok(EntryType::Directory),
607 }
608 }
609
610 pub async fn open_file<P: AsRef<Utf8Path>>(&self, path: P) -> Result<File> {
612 let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::EntryIsDirectory)?;
613
614 self.cd(parent)
615 .await?
616 .lookup_unique(name)?
617 .file()?
618 .open()
619 .await
620 }
621
622 pub async fn open_file_version<P: AsRef<Utf8Path>>(
624 &self,
625 path: P,
626 branch_id: &PublicKey,
627 ) -> Result<File> {
628 let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::EntryIsDirectory)?;
629
630 self.cd(parent)
631 .await?
632 .lookup_version(name, branch_id)?
633 .open()
634 .await
635 }
636
637 pub async fn open_directory<P: AsRef<Utf8Path>>(&self, path: P) -> Result<JointDirectory> {
639 self.cd(path).await
640 }
641
642 pub async fn create_file<P: AsRef<Utf8Path>>(&self, path: P) -> Result<File> {
644 let file = self
645 .local_branch()?
646 .ensure_file_exists(path.as_ref())
647 .await?;
648
649 Ok(file)
650 }
651
652 pub async fn create_directory<P: AsRef<Utf8Path>>(&self, path: P) -> Result<Directory> {
654 let dir = self
655 .local_branch()?
656 .ensure_directory_exists(path.as_ref())
657 .await?;
658
659 Ok(dir)
660 }
661
662 pub async fn remove_entry<P: AsRef<Utf8Path>>(&self, path: P) -> Result<()> {
664 let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::OperationNotSupported)?;
665 let mut parent = self.cd(parent).await?;
666 parent.remove_entry(name).await?;
667
668 Ok(())
669 }
670
671 pub async fn remove_entry_recursively<P: AsRef<Utf8Path>>(&self, path: P) -> Result<()> {
673 let (parent, name) = path::decompose(path.as_ref()).ok_or(Error::OperationNotSupported)?;
674 let mut parent = self.cd(parent).await?;
675 parent.remove_entry_recursively(name).await?;
676
677 Ok(())
678 }
679
680 pub async fn move_entry<S: AsRef<Utf8Path>, D: AsRef<Utf8Path>>(
683 &self,
684 src: S,
685 dst: D,
686 ) -> Result<()> {
687 let (src_dir_path, src_name) = path::decompose(src.as_ref()).ok_or(Error::EntryNotFound)?;
688 let (dst_dir_path, dst_name) = path::decompose(dst.as_ref()).ok_or(Error::EntryNotFound)?;
689
690 let local_branch = self.local_branch()?;
691 let src_joint_dir = self.cd(src_dir_path).await?;
692
693 let (mut src_dir, src_name, src_type) = match src_joint_dir.lookup_unique(src_name)? {
695 JointEntryRef::File(entry) => {
696 let src_name = entry.name().to_string();
697
698 let mut file = entry.open().await?;
699 file.fork(local_branch.clone()).await?;
700
701 (file.parent().await?, Cow::Owned(src_name), EntryType::File)
702 }
703 JointEntryRef::Directory(entry) => {
704 let mut dir_to_move = entry
705 .open_with(MissingVersionStrategy::Skip, DirectoryFallback::Disabled)
706 .await?;
707
708 let src_dir = dir_to_move
709 .merge()
710 .await?
711 .1
712 .parent()
713 .await?
714 .ok_or(Error::OperationNotSupported )?;
715
716 (src_dir, Cow::Borrowed(src_name), EntryType::Directory)
717 }
718 };
719
720 let src_entry = src_dir.lookup(&src_name)?.clone_data();
721
722 let mut dst_joint_dir = self.cd(&dst_dir_path).await?;
723 let dst_dir = dst_joint_dir
724 .local_version_mut()
725 .ok_or(Error::PermissionDenied)?;
726
727 let dst_old_entry = dst_dir.lookup(dst_name);
728
729 let dst_old_vv = match (src_type, dst_old_entry) {
732 (EntryType::File | EntryType::Directory, Ok(EntryRef::Tombstone(old_entry))) => {
733 old_entry.version_vector().clone()
734 }
735 (EntryType::File | EntryType::Directory, Err(Error::EntryNotFound)) => {
736 VersionVector::new()
737 }
738 (EntryType::File | EntryType::Directory, Err(error)) => return Err(error),
739 (EntryType::File, Ok(EntryRef::File(old_entry))) => old_entry.version_vector().clone(),
740 (EntryType::Directory, Ok(EntryRef::Directory(old_entry))) => {
741 if old_entry
742 .open(DirectoryFallback::Disabled)
743 .await?
744 .entries()
745 .all(|entry| entry.is_tombstone())
746 {
747 old_entry.version_vector().clone()
748 } else {
749 return Err(Error::DirectoryNotEmpty);
750 }
751 }
752 (EntryType::File, Ok(EntryRef::Directory(_))) => return Err(Error::EntryIsDirectory),
753 (EntryType::Directory, Ok(EntryRef::File(_))) => return Err(Error::EntryIsFile),
754 };
755
756 let dst_vv = dst_old_vv
757 .merged(src_entry.version_vector())
758 .incremented(*local_branch.id());
759
760 src_dir
761 .move_entry(&src_name, src_entry, dst_dir, dst_name, dst_vv)
762 .await?;
763
764 Ok(())
765 }
766
767 pub fn local_branch(&self) -> Result<Branch> {
770 self.shared.local_branch()
771 }
772
773 #[cfg(test)]
776 pub fn get_branch(&self, id: PublicKey) -> Result<Branch> {
777 self.shared.get_branch(id)
778 }
779
780 pub async fn get_branch_version_vector(&self, writer_id: &PublicKey) -> Result<VersionVector> {
782 Ok(self
783 .shared
784 .vault
785 .store()
786 .acquire_read()
787 .await?
788 .load_latest_approved_root_node(writer_id, RootNodeFilter::Any)
789 .await?
790 .proof
791 .into_version_vector())
792 }
793
794 pub async fn get_merged_version_vector(&self) -> Result<VersionVector> {
796 Ok(self
797 .shared
798 .vault
799 .store()
800 .acquire_read()
801 .await?
802 .load_latest_approved_root_nodes()
803 .try_fold(VersionVector::default(), |mut merged, node| {
804 merged.merge(&node.proof.version_vector);
805 future::ready(Ok(merged))
806 })
807 .await?)
808 }
809
810 pub fn subscribe(&self) -> broadcast::Receiver<Event> {
812 self.shared.vault.event_tx.subscribe()
813 }
814
815 pub async fn sync_progress(&self) -> Result<Progress> {
818 Ok(self.shared.vault.store().sync_progress().await?)
819 }
820
821 pub async fn check_integrity(&self) -> Result<bool> {
824 Ok(self.shared.vault.store().check_integrity().await?)
825 }
826
827 async fn root(&self) -> Result<JointDirectory> {
829 let local_branch = self.local_branch()?;
830 let branches = self.shared.load_branches().await?;
831
832 let branches = if local_branch.keys().write().is_some()
837 && branches
838 .iter()
839 .all(|branch| branch.id() != local_branch.id())
840 {
841 let mut branches = branches;
842 branches.push(local_branch.clone());
843 branches
844 } else {
845 branches
846 };
847
848 let mut dirs = Vec::new();
849
850 for branch in branches {
851 let dir = match branch
852 .open_root(DirectoryLocking::Enabled, DirectoryFallback::Enabled)
853 .await
854 {
855 Ok(dir) => dir,
856 Err(error @ Error::Store(store::Error::BranchNotFound)) => {
857 tracing::trace!(
858 branch_id = ?branch.id(),
859 ?error,
860 "Failed to open root directory"
861 );
862 continue;
865 }
866 Err(error @ Error::Store(store::Error::BlockNotFound)) => {
867 tracing::trace!(
868 branch_id = ?branch.id(),
869 ?error,
870 "Failed to open root directory"
871 );
872 continue;
875 }
876 Err(error) => {
877 tracing::error!(
878 branch_id = ?branch.id(),
879 ?error,
880 "Failed to open root directory"
881 );
882 return Err(error);
883 }
884 };
885
886 dirs.push(dir);
887 }
888
889 Ok(JointDirectory::new(Some(local_branch), dirs))
890 }
891
892 pub async fn cd<P: AsRef<Utf8Path>>(&self, path: P) -> Result<JointDirectory> {
893 self.root().await?.cd(path).await
894 }
895
896 pub async fn close(&self) -> Result<()> {
899 for task in [&self.worker_handle, &self.progress_reporter_handle] {
902 let task = task.lock().unwrap().take();
903 if let Some(task) = task {
904 task.abort();
905 task.await.ok();
906 }
907 }
908
909 self.shared.vault.store().close().await?;
910
911 Ok(())
912 }
913
914 pub async fn debug_print_root(&self) {
915 self.debug_print(DebugPrinter::new()).await
916 }
917
918 pub async fn debug_print(&self, print: DebugPrinter) {
919 print.display(&"Repository");
920
921 let branches = match self.shared.load_branches().await {
922 Ok(branches) => branches,
923 Err(error) => {
924 print.display(&format_args!("failed to load branches: {error:?}"));
925 return;
926 }
927 };
928
929 let writer_id = self.shared.credentials.read().unwrap().writer_id;
930
931 for branch in branches {
932 let print = print.indent();
933 let local = if branch.id() == &writer_id {
934 " (local)"
935 } else {
936 ""
937 };
938 print.display(&format_args!(
939 "Branch ID: {:?}{}, root block ID:{:?}",
940 branch.id(),
941 local,
942 branch.root_block_id().await
943 ));
944 let print = print.indent();
945 print.display(&format_args!(
946 "/, vv: {:?}",
947 branch.version_vector().await.unwrap_or_default()
948 ));
949 branch.debug_print(print.indent()).await;
950 }
951
952 print.display(&"Index");
953 let print = print.indent();
954 self.shared.vault.debug_print(print).await;
955 }
956
957 pub async fn count_blocks(&self) -> Result<u64> {
960 Ok(self.shared.vault.store().count_blocks().await?)
961 }
962
963 fn db(&self) -> &db::Pool {
964 self.shared.vault.store().db()
965 }
966
967 fn update_credentials(&self, credentials: Credentials) {
968 tracing::debug!(
969 parent: self.shared.vault.monitor.span(),
970 access = ?credentials.secrets.access_mode(),
971 writer_id = ?credentials.writer_id,
972 "Repository access mode changed"
973 );
974
975 self.shared
976 .vault
977 .block_tracker
978 .set_request_mode(block_request_mode(&credentials.secrets));
979
980 *self.shared.credentials.write().unwrap() = credentials;
981 *self.worker_handle.lock().unwrap() = Some(spawn_worker(self.shared.clone()));
982 }
983}
984
985pub struct RepositoryHandle {
986 pub(crate) vault: Vault,
987}
988
989struct Shared {
990 vault: Vault,
991 credentials: BlockingRwLock<Credentials>,
992 branch_shared: BranchShared,
993}
994
995impl Shared {
996 fn new(pool: db::Pool, credentials: Credentials, monitor: RepositoryMonitor) -> Self {
997 let event_tx = EventSender::new(EVENT_CHANNEL_CAPACITY);
998 let vault = Vault::new(*credentials.secrets.id(), event_tx, pool, monitor);
999
1000 vault
1001 .block_tracker
1002 .set_request_mode(block_request_mode(&credentials.secrets));
1003
1004 Self {
1005 vault,
1006 credentials: BlockingRwLock::new(credentials),
1007 branch_shared: BranchShared::new(),
1008 }
1009 }
1010
1011 pub fn local_branch(&self) -> Result<Branch> {
1012 let credentials = self.credentials.read().unwrap();
1013
1014 Ok(self.make_branch(
1015 credentials.writer_id,
1016 credentials.secrets.keys().ok_or(Error::PermissionDenied)?,
1017 ))
1018 }
1019
1020 pub fn get_branch(&self, id: PublicKey) -> Result<Branch> {
1021 let credentials = self.credentials.read().unwrap();
1022 let keys = credentials.secrets.keys().ok_or(Error::PermissionDenied)?;
1023
1024 let keys = if id == credentials.writer_id {
1026 keys
1027 } else {
1028 keys.read_only()
1029 };
1030
1031 Ok(self.make_branch(id, keys))
1032 }
1033
1034 fn make_branch(&self, id: PublicKey, keys: AccessKeys) -> Branch {
1035 Branch::new(
1036 id,
1037 self.vault.store().clone(),
1038 keys,
1039 self.branch_shared.clone(),
1040 self.vault.event_tx.clone(),
1041 )
1042 }
1043
1044 pub async fn load_branches(&self) -> Result<Vec<Branch>> {
1045 self.vault
1046 .store()
1047 .acquire_read()
1048 .await?
1049 .load_latest_approved_root_nodes()
1050 .err_into()
1051 .and_then(|root_node| future::ready(self.get_branch(root_node.proof.writer_id)))
1052 .try_collect()
1053 .await
1054 }
1055}
1056
1057fn spawn_worker(shared: Arc<Shared>) -> ScopedJoinHandle<()> {
1058 let span = shared.vault.monitor.span().clone();
1059 scoped_task::spawn(worker::run(shared).instrument(span))
1060}
1061
1062async fn report_sync_progress(vault: Vault) {
1063 let mut prev_progress = Progress { value: 0, total: 0 };
1064
1065 let events = stream::unfold(vault.event_tx.subscribe(), |mut rx| async move {
1066 match rx.recv().await {
1067 Ok(_) | Err(RecvError::Lagged(_)) => Some(((), rx)),
1068 Err(RecvError::Closed) => None,
1069 }
1070 });
1071 let events = Throttle::new(events, Duration::from_secs(1));
1072 let mut events = pin!(events);
1073
1074 while events.next().await.is_some() {
1075 let next_progress = match vault.store().sync_progress().await {
1076 Ok(progress) => progress,
1077 Err(error) => {
1078 tracing::error!("Failed to retrieve sync progress: {:?}", error);
1079 continue;
1080 }
1081 };
1082
1083 if next_progress != prev_progress {
1084 prev_progress = next_progress;
1085 tracing::debug!(
1086 "Sync progress: {} bytes ({:.1})",
1087 prev_progress * BLOCK_SIZE as u64,
1088 prev_progress.percent()
1089 );
1090 }
1091 }
1092}
1093
1094fn block_request_mode(secrets: &AccessSecrets) -> BlockRequestMode {
1095 if secrets.can_read() {
1096 BlockRequestMode::Lazy
1097 } else {
1098 BlockRequestMode::Greedy
1099 }
1100}