ouisync/branch/
mod.rs

1use crate::{
2    access_control::AccessKeys,
3    blob::lock::{BranchLocker, Locker},
4    crypto::sign::PublicKey,
5    debug::DebugPrinter,
6    directory::{Directory, DirectoryFallback, DirectoryLocking, EntryRef},
7    error::{Error, Result},
8    event::{EventScope, EventSender, Payload},
9    file::File,
10    path,
11    protocol::{BlockId, Locator, Proof, RootNodeFilter},
12    store::{self, Store},
13    version_vector::VersionVector,
14};
15use camino::{Utf8Component, Utf8Path};
16
17#[derive(Clone)]
18pub struct Branch {
19    id: PublicKey,
20    store: Store,
21    keys: AccessKeys,
22    shared: BranchShared,
23    event_tx: EventSender,
24}
25
26impl Branch {
27    pub(crate) fn new(
28        id: PublicKey,
29        store: Store,
30        keys: AccessKeys,
31        shared: BranchShared,
32        event_tx: EventSender,
33    ) -> Self {
34        Self {
35            id,
36            store,
37            keys,
38            shared,
39            event_tx,
40        }
41    }
42
43    /// Binds the given event scope to this branch. Any event from this branch or any objects
44    /// belonging to it (files, directories) will be sent with this scope.
45    pub(crate) fn with_event_scope(self, event_scope: EventScope) -> Self {
46        Self {
47            event_tx: self.event_tx.with_scope(event_scope),
48            ..self
49        }
50    }
51
52    pub fn id(&self) -> &PublicKey {
53        &self.id
54    }
55
56    pub(crate) fn store(&self) -> &Store {
57        &self.store
58    }
59
60    pub async fn version_vector(&self) -> Result<VersionVector> {
61        match self.proof().await {
62            Ok(proof) => Ok(proof.into_version_vector()),
63            Err(Error::Store(store::Error::BranchNotFound)) => Ok(VersionVector::new()),
64            Err(error) => Err(error),
65        }
66    }
67
68    pub(crate) async fn proof(&self) -> Result<Proof> {
69        Ok(self
70            .store
71            .acquire_read()
72            .await?
73            .load_latest_approved_root_node(self.id(), RootNodeFilter::Any)
74            .await?
75            .proof)
76    }
77
78    pub(crate) fn keys(&self) -> &AccessKeys {
79        &self.keys
80    }
81
82    pub(crate) async fn open_root(
83        &self,
84        locking: DirectoryLocking,
85        fallback: DirectoryFallback,
86    ) -> Result<Directory> {
87        Directory::open_root(self.clone(), locking, fallback).await
88    }
89
90    pub(crate) async fn open_or_create_root(&self) -> Result<Directory> {
91        Directory::open_or_create_root(self.clone(), VersionVector::new()).await
92    }
93
94    /// Ensures that the directory at the specified path exists including all its ancestors.
95    /// Note: non-normalized paths (i.e. containing "..") or Windows-style drive prefixes
96    /// (e.g. "C:") are not supported.
97    pub(crate) async fn ensure_directory_exists(&self, path: &Utf8Path) -> Result<Directory> {
98        let mut curr = self.open_or_create_root().await?;
99
100        for component in path.components() {
101            match component {
102                Utf8Component::RootDir | Utf8Component::CurDir => (),
103                Utf8Component::Normal(name) => {
104                    let next = match curr.lookup(name) {
105                        Ok(EntryRef::Directory(entry)) => {
106                            Some(entry.open(DirectoryFallback::Disabled).await?)
107                        }
108                        Ok(EntryRef::File(_)) => return Err(Error::EntryIsFile),
109                        Ok(EntryRef::Tombstone(_)) | Err(Error::EntryNotFound) => None,
110                        Err(error) => return Err(error),
111                    };
112
113                    let next = if let Some(next) = next {
114                        next
115                    } else {
116                        curr.create_directory(
117                            name.to_string(),
118                            rand::random(),
119                            &VersionVector::new(),
120                        )
121                        .await?
122                    };
123
124                    curr = next;
125                }
126                Utf8Component::Prefix(_) | Utf8Component::ParentDir => {
127                    return Err(Error::OperationNotSupported)
128                }
129            }
130        }
131
132        Ok(curr)
133    }
134
135    pub(crate) async fn ensure_file_exists(&self, path: &Utf8Path) -> Result<File> {
136        let (parent, name) = path::decompose(path).ok_or(Error::EntryIsDirectory)?;
137        self.ensure_directory_exists(parent)
138            .await?
139            .create_file(name.to_string())
140            .await
141    }
142
143    pub(crate) async fn root_block_id(&self) -> Result<BlockId> {
144        let (block_id, _) = self
145            .store
146            .begin_read()
147            .await?
148            .find_block(self.id(), &Locator::ROOT.encode(self.keys().read()))
149            .await?;
150
151        Ok(block_id)
152    }
153
154    pub(crate) fn locker(&self) -> BranchLocker {
155        self.shared.locker.branch(*self.id())
156    }
157
158    pub(crate) fn notify(&self) -> BranchEventSender {
159        BranchEventSender {
160            event_tx: self.event_tx.clone(),
161            branch_id: *self.id(),
162        }
163    }
164
165    pub async fn debug_print(&self, print: DebugPrinter) {
166        match self
167            .open_root(DirectoryLocking::Disabled, DirectoryFallback::Disabled)
168            .await
169        {
170            Ok(root) => root.debug_print(print).await,
171            Err(error) => {
172                print.display(&format_args!("failed to open root directory: {error:?}"))
173            }
174        }
175    }
176
177    #[cfg(test)]
178    pub(crate) fn reopen(self, keys: AccessKeys) -> Self {
179        Self { keys, ..self }
180    }
181
182    /// Clones (the latest snapshot of) this branch into another branch and returns that branch.
183    #[cfg(test)]
184    pub(crate) async fn clone_into(&self, dst_id: PublicKey) -> Result<Self> {
185        let mut tx = self.store().begin_write().await?;
186
187        match tx
188            .load_latest_approved_root_node(self.id(), RootNodeFilter::Any)
189            .await
190        {
191            Ok(node) => {
192                tx.clone_root_node_into(
193                    node,
194                    dst_id,
195                    self.keys().write().ok_or(Error::PermissionDenied)?,
196                )
197                .await?;
198            }
199            Err(store::Error::BranchNotFound) => (),
200            Err(error) => return Err(error.into()),
201        }
202
203        tx.commit().await?;
204
205        Ok(Self {
206            id: dst_id,
207            ..self.clone()
208        })
209    }
210}
211
212/// State shared among all branches.
213#[derive(Clone)]
214pub(crate) struct BranchShared {
215    pub locker: Locker,
216}
217
218impl BranchShared {
219    pub fn new() -> Self {
220        Self {
221            locker: Locker::new(),
222        }
223    }
224}
225
226/// Sender to send event notification for the given branch.
227#[derive(Clone)]
228pub(crate) struct BranchEventSender {
229    event_tx: EventSender,
230    branch_id: PublicKey,
231}
232
233impl BranchEventSender {
234    pub fn send(&self) {
235        self.event_tx
236            .send(Payload::SnapshotApproved(self.branch_id));
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use crate::{access_control::WriteSecrets, blob::BlobId, db, event::EventSender};
244    use assert_matches::assert_matches;
245    use tempfile::TempDir;
246
247    #[tokio::test(flavor = "multi_thread")]
248    async fn ensure_root_directory_exists() {
249        let (_base_dir, branch) = setup().await;
250        let dir = branch.ensure_directory_exists("/".into()).await.unwrap();
251        assert_eq!(dir.blob_id(), &BlobId::ROOT);
252    }
253
254    #[tokio::test(flavor = "multi_thread")]
255    async fn ensure_subdirectory_exists() {
256        let (_base_dir, branch) = setup().await;
257
258        let mut root = branch.open_or_create_root().await.unwrap();
259
260        branch
261            .ensure_directory_exists(Utf8Path::new("/dir"))
262            .await
263            .unwrap();
264
265        root.refresh().await.unwrap();
266        let _ = root.lookup("dir").unwrap();
267    }
268
269    #[tokio::test(flavor = "multi_thread")]
270    async fn attempt_to_modify_file_on_read_only_branch() {
271        let (_base_dir, branch) = setup().await;
272
273        let mut file = branch
274            .open_or_create_root()
275            .await
276            .unwrap()
277            .create_file("test.txt".into())
278            .await
279            .unwrap();
280        file.write_all(b"foo").await.unwrap();
281        file.flush().await.unwrap();
282        drop(file);
283
284        let keys = branch.keys().clone().read_only();
285        let branch = branch.reopen(keys);
286
287        let mut file = branch
288            .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
289            .await
290            .unwrap()
291            .lookup("test.txt")
292            .unwrap()
293            .file()
294            .unwrap()
295            .open()
296            .await
297            .unwrap();
298
299        file.truncate(0).unwrap();
300        assert_matches!(file.flush().await, Err(Error::PermissionDenied));
301
302        file.write_all(b"bar").await.unwrap();
303        assert_matches!(file.flush().await, Err(Error::PermissionDenied));
304    }
305
306    async fn setup() -> (TempDir, Branch) {
307        let (base_dir, pool) = db::create_temp().await.unwrap();
308
309        let writer_id = PublicKey::random();
310        let secrets = WriteSecrets::random();
311        let event_tx = EventSender::new(1);
312
313        let store = Store::new(pool);
314        let shared = BranchShared::new();
315        let branch = Branch::new(writer_id, store, secrets.into(), shared, event_tx);
316
317        (base_dir, branch)
318    }
319}