ouisync/branch/
mod.rs

1use crate::{
2    access_control::AccessKeys,
3    blob::lock::{BranchLocker, Locker},
4    crypto::sign::PublicKey,
5    debug::DebugPrinter,
6    directory::{Directory, DirectoryFallback, DirectoryLocking, EntryRef},
7    error::{Error, Result},
8    event::{EventScope, EventSender, Payload},
9    file::File,
10    path,
11    protocol::{BlockId, Locator, Proof, RootNodeFilter},
12    store::{self, Store},
13    version_vector::VersionVector,
14};
15use camino::{Utf8Component, Utf8Path};
16
17#[derive(Clone)]
18pub struct Branch {
19    id: PublicKey,
20    store: Store,
21    keys: AccessKeys,
22    shared: BranchShared,
23    event_tx: EventSender,
24}
25
26impl Branch {
27    pub(crate) fn new(
28        id: PublicKey,
29        store: Store,
30        keys: AccessKeys,
31        shared: BranchShared,
32        event_tx: EventSender,
33    ) -> Self {
34        Self {
35            id,
36            store,
37            keys,
38            shared,
39            event_tx,
40        }
41    }
42
43    /// Binds the given event scope to this branch. Any event from this branch or any objects
44    /// belonging to it (files, directories) will be sent with this scope.
45    pub(crate) fn with_event_scope(self, event_scope: EventScope) -> Self {
46        Self {
47            event_tx: self.event_tx.with_scope(event_scope),
48            ..self
49        }
50    }
51
52    pub fn id(&self) -> &PublicKey {
53        &self.id
54    }
55
56    pub(crate) fn store(&self) -> &Store {
57        &self.store
58    }
59
60    pub async fn version_vector(&self) -> Result<VersionVector> {
61        match self.proof().await {
62            Ok(proof) => Ok(proof.into_version_vector()),
63            Err(Error::Store(store::Error::BranchNotFound)) => Ok(VersionVector::new()),
64            Err(error) => Err(error),
65        }
66    }
67
68    pub(crate) async fn proof(&self) -> Result<Proof> {
69        Ok(self
70            .store
71            .acquire_read()
72            .await?
73            .load_latest_approved_root_node(self.id(), RootNodeFilter::Any)
74            .await?
75            .proof)
76    }
77
78    pub(crate) fn keys(&self) -> &AccessKeys {
79        &self.keys
80    }
81
82    pub(crate) async fn open_root(
83        &self,
84        locking: DirectoryLocking,
85        fallback: DirectoryFallback,
86    ) -> Result<Directory> {
87        Directory::open_root(self.clone(), locking, fallback).await
88    }
89
90    pub(crate) async fn open_or_create_root(&self) -> Result<Directory> {
91        Directory::open_or_create_root(self.clone(), VersionVector::new()).await
92    }
93
94    /// Ensures that the directory at the specified path exists including all its ancestors.
95    /// Note: non-normalized paths (i.e. containing "..") or Windows-style drive prefixes
96    /// (e.g. "C:") are not supported.
97    pub(crate) async fn ensure_directory_exists(&self, path: &Utf8Path) -> Result<Directory> {
98        let mut curr = self.open_or_create_root().await?;
99
100        for component in path.components() {
101            match component {
102                Utf8Component::RootDir | Utf8Component::CurDir => (),
103                Utf8Component::Normal(name) => {
104                    let next = match curr.lookup(name) {
105                        Ok(EntryRef::Directory(entry)) => {
106                            Some(entry.open(DirectoryFallback::Disabled).await?)
107                        }
108                        Ok(EntryRef::File(_)) => return Err(Error::EntryIsFile),
109                        Ok(EntryRef::Tombstone(_)) | Err(Error::EntryNotFound) => None,
110                        Err(error) => return Err(error),
111                    };
112
113                    let next = if let Some(next) = next {
114                        next
115                    } else {
116                        curr.create_directory(
117                            name.to_string(),
118                            rand::random(),
119                            &VersionVector::new(),
120                        )
121                        .await?
122                    };
123
124                    curr = next;
125                }
126                Utf8Component::Prefix(_) | Utf8Component::ParentDir => {
127                    return Err(Error::OperationNotSupported);
128                }
129            }
130        }
131
132        Ok(curr)
133    }
134
135    pub(crate) async fn ensure_file_exists(&self, path: &Utf8Path) -> Result<File> {
136        let (parent, name) = path::decompose(path).ok_or(Error::EntryIsDirectory)?;
137        self.ensure_directory_exists(parent)
138            .await?
139            .create_file(name.to_string())
140            .await
141    }
142
143    pub(crate) async fn root_block_id(&self) -> Result<BlockId> {
144        let (block_id, _) = self
145            .store
146            .begin_read()
147            .await?
148            .find_block(self.id(), &Locator::ROOT.encode(self.keys().read()))
149            .await?;
150
151        Ok(block_id)
152    }
153
154    pub(crate) fn locker(&self) -> BranchLocker {
155        self.shared.locker.branch(*self.id())
156    }
157
158    pub(crate) fn notify(&self) -> BranchEventSender {
159        BranchEventSender {
160            event_tx: self.event_tx.clone(),
161            branch_id: *self.id(),
162        }
163    }
164
165    pub async fn debug_print(&self, print: DebugPrinter) {
166        match self
167            .open_root(DirectoryLocking::Disabled, DirectoryFallback::Disabled)
168            .await
169        {
170            Ok(root) => root.debug_print(print).await,
171            Err(error) => print.display(&format_args!("failed to open root directory: {error:?}")),
172        }
173    }
174
175    #[cfg(test)]
176    pub(crate) fn reopen(self, keys: AccessKeys) -> Self {
177        Self { keys, ..self }
178    }
179
180    /// Clones (the latest snapshot of) this branch into another branch and returns that branch.
181    #[cfg(test)]
182    pub(crate) async fn clone_into(&self, dst_id: PublicKey) -> Result<Self> {
183        let mut tx = self.store().begin_write().await?;
184
185        match tx
186            .load_latest_approved_root_node(self.id(), RootNodeFilter::Any)
187            .await
188        {
189            Ok(node) => {
190                tx.clone_root_node_into(
191                    node,
192                    dst_id,
193                    self.keys().write().ok_or(Error::PermissionDenied)?,
194                )
195                .await?;
196            }
197            Err(store::Error::BranchNotFound) => (),
198            Err(error) => return Err(error.into()),
199        }
200
201        tx.commit().await?;
202
203        Ok(Self {
204            id: dst_id,
205            ..self.clone()
206        })
207    }
208}
209
210/// State shared among all branches.
211#[derive(Clone)]
212pub(crate) struct BranchShared {
213    pub locker: Locker,
214}
215
216impl BranchShared {
217    pub fn new() -> Self {
218        Self {
219            locker: Locker::new(),
220        }
221    }
222}
223
224/// Sender to send event notification for the given branch.
225#[derive(Clone)]
226pub(crate) struct BranchEventSender {
227    event_tx: EventSender,
228    branch_id: PublicKey,
229}
230
231impl BranchEventSender {
232    pub fn send(&self) {
233        self.event_tx
234            .send(Payload::SnapshotApproved(self.branch_id));
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use crate::{access_control::WriteSecrets, blob::BlobId, db, event::EventSender};
242    use assert_matches::assert_matches;
243    use tempfile::TempDir;
244
245    #[tokio::test(flavor = "multi_thread")]
246    async fn ensure_root_directory_exists() {
247        let (_base_dir, branch) = setup().await;
248        let dir = branch.ensure_directory_exists("/".into()).await.unwrap();
249        assert_eq!(dir.blob_id(), &BlobId::ROOT);
250    }
251
252    #[tokio::test(flavor = "multi_thread")]
253    async fn ensure_subdirectory_exists() {
254        let (_base_dir, branch) = setup().await;
255
256        let mut root = branch.open_or_create_root().await.unwrap();
257
258        branch
259            .ensure_directory_exists(Utf8Path::new("/dir"))
260            .await
261            .unwrap();
262
263        root.refresh().await.unwrap();
264        let _ = root.lookup("dir").unwrap();
265    }
266
267    #[tokio::test(flavor = "multi_thread")]
268    async fn attempt_to_modify_file_on_read_only_branch() {
269        let (_base_dir, branch) = setup().await;
270
271        let mut file = branch
272            .open_or_create_root()
273            .await
274            .unwrap()
275            .create_file("test.txt".into())
276            .await
277            .unwrap();
278        file.write_all(b"foo").await.unwrap();
279        file.flush().await.unwrap();
280        drop(file);
281
282        let keys = branch.keys().clone().read_only();
283        let branch = branch.reopen(keys);
284
285        let mut file = branch
286            .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
287            .await
288            .unwrap()
289            .lookup("test.txt")
290            .unwrap()
291            .file()
292            .unwrap()
293            .open()
294            .await
295            .unwrap();
296
297        file.truncate(0).unwrap();
298        assert_matches!(file.flush().await, Err(Error::PermissionDenied));
299
300        file.write_all(b"bar").await.unwrap();
301        assert_matches!(file.flush().await, Err(Error::PermissionDenied));
302    }
303
304    async fn setup() -> (TempDir, Branch) {
305        let (base_dir, pool) = db::create_temp().await.unwrap();
306
307        let writer_id = PublicKey::random();
308        let secrets = WriteSecrets::random();
309        let event_tx = EventSender::new(1);
310
311        let store = Store::new(pool);
312        let shared = BranchShared::new();
313        let branch = Branch::new(writer_id, store, secrets.into(), shared, event_tx);
314
315        (base_dir, branch)
316    }
317}