1use crate::{
2 access_control::AccessKeys,
3 blob::lock::{BranchLocker, Locker},
4 crypto::sign::PublicKey,
5 debug::DebugPrinter,
6 directory::{Directory, DirectoryFallback, DirectoryLocking, EntryRef},
7 error::{Error, Result},
8 event::{EventScope, EventSender, Payload},
9 file::File,
10 path,
11 protocol::{BlockId, Locator, Proof, RootNodeFilter},
12 store::{self, Store},
13 version_vector::VersionVector,
14};
15use camino::{Utf8Component, Utf8Path};
16
17#[derive(Clone)]
18pub struct Branch {
19 id: PublicKey,
20 store: Store,
21 keys: AccessKeys,
22 shared: BranchShared,
23 event_tx: EventSender,
24}
25
26impl Branch {
27 pub(crate) fn new(
28 id: PublicKey,
29 store: Store,
30 keys: AccessKeys,
31 shared: BranchShared,
32 event_tx: EventSender,
33 ) -> Self {
34 Self {
35 id,
36 store,
37 keys,
38 shared,
39 event_tx,
40 }
41 }
42
43 pub(crate) fn with_event_scope(self, event_scope: EventScope) -> Self {
46 Self {
47 event_tx: self.event_tx.with_scope(event_scope),
48 ..self
49 }
50 }
51
52 pub fn id(&self) -> &PublicKey {
53 &self.id
54 }
55
56 pub(crate) fn store(&self) -> &Store {
57 &self.store
58 }
59
60 pub async fn version_vector(&self) -> Result<VersionVector> {
61 match self.proof().await {
62 Ok(proof) => Ok(proof.into_version_vector()),
63 Err(Error::Store(store::Error::BranchNotFound)) => Ok(VersionVector::new()),
64 Err(error) => Err(error),
65 }
66 }
67
68 pub(crate) async fn proof(&self) -> Result<Proof> {
69 Ok(self
70 .store
71 .acquire_read()
72 .await?
73 .load_latest_approved_root_node(self.id(), RootNodeFilter::Any)
74 .await?
75 .proof)
76 }
77
78 pub(crate) fn keys(&self) -> &AccessKeys {
79 &self.keys
80 }
81
82 pub(crate) async fn open_root(
83 &self,
84 locking: DirectoryLocking,
85 fallback: DirectoryFallback,
86 ) -> Result<Directory> {
87 Directory::open_root(self.clone(), locking, fallback).await
88 }
89
90 pub(crate) async fn open_or_create_root(&self) -> Result<Directory> {
91 Directory::open_or_create_root(self.clone(), VersionVector::new()).await
92 }
93
94 pub(crate) async fn ensure_directory_exists(&self, path: &Utf8Path) -> Result<Directory> {
98 let mut curr = self.open_or_create_root().await?;
99
100 for component in path.components() {
101 match component {
102 Utf8Component::RootDir | Utf8Component::CurDir => (),
103 Utf8Component::Normal(name) => {
104 let next = match curr.lookup(name) {
105 Ok(EntryRef::Directory(entry)) => {
106 Some(entry.open(DirectoryFallback::Disabled).await?)
107 }
108 Ok(EntryRef::File(_)) => return Err(Error::EntryIsFile),
109 Ok(EntryRef::Tombstone(_)) | Err(Error::EntryNotFound) => None,
110 Err(error) => return Err(error),
111 };
112
113 let next = if let Some(next) = next {
114 next
115 } else {
116 curr.create_directory(
117 name.to_string(),
118 rand::random(),
119 &VersionVector::new(),
120 )
121 .await?
122 };
123
124 curr = next;
125 }
126 Utf8Component::Prefix(_) | Utf8Component::ParentDir => {
127 return Err(Error::OperationNotSupported)
128 }
129 }
130 }
131
132 Ok(curr)
133 }
134
135 pub(crate) async fn ensure_file_exists(&self, path: &Utf8Path) -> Result<File> {
136 let (parent, name) = path::decompose(path).ok_or(Error::EntryIsDirectory)?;
137 self.ensure_directory_exists(parent)
138 .await?
139 .create_file(name.to_string())
140 .await
141 }
142
143 pub(crate) async fn root_block_id(&self) -> Result<BlockId> {
144 let (block_id, _) = self
145 .store
146 .begin_read()
147 .await?
148 .find_block(self.id(), &Locator::ROOT.encode(self.keys().read()))
149 .await?;
150
151 Ok(block_id)
152 }
153
154 pub(crate) fn locker(&self) -> BranchLocker {
155 self.shared.locker.branch(*self.id())
156 }
157
158 pub(crate) fn notify(&self) -> BranchEventSender {
159 BranchEventSender {
160 event_tx: self.event_tx.clone(),
161 branch_id: *self.id(),
162 }
163 }
164
165 pub async fn debug_print(&self, print: DebugPrinter) {
166 match self
167 .open_root(DirectoryLocking::Disabled, DirectoryFallback::Disabled)
168 .await
169 {
170 Ok(root) => root.debug_print(print).await,
171 Err(error) => {
172 print.display(&format_args!("failed to open root directory: {error:?}"))
173 }
174 }
175 }
176
177 #[cfg(test)]
178 pub(crate) fn reopen(self, keys: AccessKeys) -> Self {
179 Self { keys, ..self }
180 }
181
182 #[cfg(test)]
184 pub(crate) async fn clone_into(&self, dst_id: PublicKey) -> Result<Self> {
185 let mut tx = self.store().begin_write().await?;
186
187 match tx
188 .load_latest_approved_root_node(self.id(), RootNodeFilter::Any)
189 .await
190 {
191 Ok(node) => {
192 tx.clone_root_node_into(
193 node,
194 dst_id,
195 self.keys().write().ok_or(Error::PermissionDenied)?,
196 )
197 .await?;
198 }
199 Err(store::Error::BranchNotFound) => (),
200 Err(error) => return Err(error.into()),
201 }
202
203 tx.commit().await?;
204
205 Ok(Self {
206 id: dst_id,
207 ..self.clone()
208 })
209 }
210}
211
212#[derive(Clone)]
214pub(crate) struct BranchShared {
215 pub locker: Locker,
216}
217
218impl BranchShared {
219 pub fn new() -> Self {
220 Self {
221 locker: Locker::new(),
222 }
223 }
224}
225
226#[derive(Clone)]
228pub(crate) struct BranchEventSender {
229 event_tx: EventSender,
230 branch_id: PublicKey,
231}
232
233impl BranchEventSender {
234 pub fn send(&self) {
235 self.event_tx
236 .send(Payload::SnapshotApproved(self.branch_id));
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use crate::{access_control::WriteSecrets, blob::BlobId, db, event::EventSender};
244 use assert_matches::assert_matches;
245 use tempfile::TempDir;
246
247 #[tokio::test(flavor = "multi_thread")]
248 async fn ensure_root_directory_exists() {
249 let (_base_dir, branch) = setup().await;
250 let dir = branch.ensure_directory_exists("/".into()).await.unwrap();
251 assert_eq!(dir.blob_id(), &BlobId::ROOT);
252 }
253
254 #[tokio::test(flavor = "multi_thread")]
255 async fn ensure_subdirectory_exists() {
256 let (_base_dir, branch) = setup().await;
257
258 let mut root = branch.open_or_create_root().await.unwrap();
259
260 branch
261 .ensure_directory_exists(Utf8Path::new("/dir"))
262 .await
263 .unwrap();
264
265 root.refresh().await.unwrap();
266 let _ = root.lookup("dir").unwrap();
267 }
268
269 #[tokio::test(flavor = "multi_thread")]
270 async fn attempt_to_modify_file_on_read_only_branch() {
271 let (_base_dir, branch) = setup().await;
272
273 let mut file = branch
274 .open_or_create_root()
275 .await
276 .unwrap()
277 .create_file("test.txt".into())
278 .await
279 .unwrap();
280 file.write_all(b"foo").await.unwrap();
281 file.flush().await.unwrap();
282 drop(file);
283
284 let keys = branch.keys().clone().read_only();
285 let branch = branch.reopen(keys);
286
287 let mut file = branch
288 .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
289 .await
290 .unwrap()
291 .lookup("test.txt")
292 .unwrap()
293 .file()
294 .unwrap()
295 .open()
296 .await
297 .unwrap();
298
299 file.truncate(0).unwrap();
300 assert_matches!(file.flush().await, Err(Error::PermissionDenied));
301
302 file.write_all(b"bar").await.unwrap();
303 assert_matches!(file.flush().await, Err(Error::PermissionDenied));
304 }
305
306 async fn setup() -> (TempDir, Branch) {
307 let (base_dir, pool) = db::create_temp().await.unwrap();
308
309 let writer_id = PublicKey::random();
310 let secrets = WriteSecrets::random();
311 let event_tx = EventSender::new(1);
312
313 let store = Store::new(pool);
314 let shared = BranchShared::new();
315 let branch = Branch::new(writer_id, store, secrets.into(), shared, event_tx);
316
317 (base_dir, branch)
318 }
319}