1use crate::{
2 access_control::AccessKeys,
3 blob::lock::{BranchLocker, Locker},
4 crypto::sign::PublicKey,
5 debug::DebugPrinter,
6 directory::{Directory, DirectoryFallback, DirectoryLocking, EntryRef},
7 error::{Error, Result},
8 event::{EventScope, EventSender, Payload},
9 file::File,
10 path,
11 protocol::{BlockId, Locator, Proof, RootNodeFilter},
12 store::{self, Store},
13 version_vector::VersionVector,
14};
15use camino::{Utf8Component, Utf8Path};
16
17#[derive(Clone)]
18pub struct Branch {
19 id: PublicKey,
20 store: Store,
21 keys: AccessKeys,
22 shared: BranchShared,
23 event_tx: EventSender,
24}
25
26impl Branch {
27 pub(crate) fn new(
28 id: PublicKey,
29 store: Store,
30 keys: AccessKeys,
31 shared: BranchShared,
32 event_tx: EventSender,
33 ) -> Self {
34 Self {
35 id,
36 store,
37 keys,
38 shared,
39 event_tx,
40 }
41 }
42
43 pub(crate) fn with_event_scope(self, event_scope: EventScope) -> Self {
46 Self {
47 event_tx: self.event_tx.with_scope(event_scope),
48 ..self
49 }
50 }
51
52 pub fn id(&self) -> &PublicKey {
53 &self.id
54 }
55
56 pub(crate) fn store(&self) -> &Store {
57 &self.store
58 }
59
60 pub async fn version_vector(&self) -> Result<VersionVector> {
61 match self.proof().await {
62 Ok(proof) => Ok(proof.into_version_vector()),
63 Err(Error::Store(store::Error::BranchNotFound)) => Ok(VersionVector::new()),
64 Err(error) => Err(error),
65 }
66 }
67
68 pub(crate) async fn proof(&self) -> Result<Proof> {
69 Ok(self
70 .store
71 .acquire_read()
72 .await?
73 .load_latest_approved_root_node(self.id(), RootNodeFilter::Any)
74 .await?
75 .proof)
76 }
77
78 pub(crate) fn keys(&self) -> &AccessKeys {
79 &self.keys
80 }
81
82 pub(crate) async fn open_root(
83 &self,
84 locking: DirectoryLocking,
85 fallback: DirectoryFallback,
86 ) -> Result<Directory> {
87 Directory::open_root(self.clone(), locking, fallback).await
88 }
89
90 pub(crate) async fn open_or_create_root(&self) -> Result<Directory> {
91 Directory::open_or_create_root(self.clone(), VersionVector::new()).await
92 }
93
94 pub(crate) async fn ensure_directory_exists(&self, path: &Utf8Path) -> Result<Directory> {
98 let mut curr = self.open_or_create_root().await?;
99
100 for component in path.components() {
101 match component {
102 Utf8Component::RootDir | Utf8Component::CurDir => (),
103 Utf8Component::Normal(name) => {
104 let next = match curr.lookup(name) {
105 Ok(EntryRef::Directory(entry)) => {
106 Some(entry.open(DirectoryFallback::Disabled).await?)
107 }
108 Ok(EntryRef::File(_)) => return Err(Error::EntryIsFile),
109 Ok(EntryRef::Tombstone(_)) | Err(Error::EntryNotFound) => None,
110 Err(error) => return Err(error),
111 };
112
113 let next = if let Some(next) = next {
114 next
115 } else {
116 curr.create_directory(
117 name.to_string(),
118 rand::random(),
119 &VersionVector::new(),
120 )
121 .await?
122 };
123
124 curr = next;
125 }
126 Utf8Component::Prefix(_) | Utf8Component::ParentDir => {
127 return Err(Error::OperationNotSupported);
128 }
129 }
130 }
131
132 Ok(curr)
133 }
134
135 pub(crate) async fn ensure_file_exists(&self, path: &Utf8Path) -> Result<File> {
136 let (parent, name) = path::decompose(path).ok_or(Error::EntryIsDirectory)?;
137 self.ensure_directory_exists(parent)
138 .await?
139 .create_file(name.to_string())
140 .await
141 }
142
143 pub(crate) async fn root_block_id(&self) -> Result<BlockId> {
144 let (block_id, _) = self
145 .store
146 .begin_read()
147 .await?
148 .find_block(self.id(), &Locator::ROOT.encode(self.keys().read()))
149 .await?;
150
151 Ok(block_id)
152 }
153
154 pub(crate) fn locker(&self) -> BranchLocker {
155 self.shared.locker.branch(*self.id())
156 }
157
158 pub(crate) fn notify(&self) -> BranchEventSender {
159 BranchEventSender {
160 event_tx: self.event_tx.clone(),
161 branch_id: *self.id(),
162 }
163 }
164
165 pub async fn debug_print(&self, print: DebugPrinter) {
166 match self
167 .open_root(DirectoryLocking::Disabled, DirectoryFallback::Disabled)
168 .await
169 {
170 Ok(root) => root.debug_print(print).await,
171 Err(error) => print.display(&format_args!("failed to open root directory: {error:?}")),
172 }
173 }
174
175 #[cfg(test)]
176 pub(crate) fn reopen(self, keys: AccessKeys) -> Self {
177 Self { keys, ..self }
178 }
179
180 #[cfg(test)]
182 pub(crate) async fn clone_into(&self, dst_id: PublicKey) -> Result<Self> {
183 let mut tx = self.store().begin_write().await?;
184
185 match tx
186 .load_latest_approved_root_node(self.id(), RootNodeFilter::Any)
187 .await
188 {
189 Ok(node) => {
190 tx.clone_root_node_into(
191 node,
192 dst_id,
193 self.keys().write().ok_or(Error::PermissionDenied)?,
194 )
195 .await?;
196 }
197 Err(store::Error::BranchNotFound) => (),
198 Err(error) => return Err(error.into()),
199 }
200
201 tx.commit().await?;
202
203 Ok(Self {
204 id: dst_id,
205 ..self.clone()
206 })
207 }
208}
209
210#[derive(Clone)]
212pub(crate) struct BranchShared {
213 pub locker: Locker,
214}
215
216impl BranchShared {
217 pub fn new() -> Self {
218 Self {
219 locker: Locker::new(),
220 }
221 }
222}
223
224#[derive(Clone)]
226pub(crate) struct BranchEventSender {
227 event_tx: EventSender,
228 branch_id: PublicKey,
229}
230
231impl BranchEventSender {
232 pub fn send(&self) {
233 self.event_tx
234 .send(Payload::SnapshotApproved(self.branch_id));
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use crate::{access_control::WriteSecrets, blob::BlobId, db, event::EventSender};
242 use assert_matches::assert_matches;
243 use tempfile::TempDir;
244
245 #[tokio::test(flavor = "multi_thread")]
246 async fn ensure_root_directory_exists() {
247 let (_base_dir, branch) = setup().await;
248 let dir = branch.ensure_directory_exists("/".into()).await.unwrap();
249 assert_eq!(dir.blob_id(), &BlobId::ROOT);
250 }
251
252 #[tokio::test(flavor = "multi_thread")]
253 async fn ensure_subdirectory_exists() {
254 let (_base_dir, branch) = setup().await;
255
256 let mut root = branch.open_or_create_root().await.unwrap();
257
258 branch
259 .ensure_directory_exists(Utf8Path::new("/dir"))
260 .await
261 .unwrap();
262
263 root.refresh().await.unwrap();
264 let _ = root.lookup("dir").unwrap();
265 }
266
267 #[tokio::test(flavor = "multi_thread")]
268 async fn attempt_to_modify_file_on_read_only_branch() {
269 let (_base_dir, branch) = setup().await;
270
271 let mut file = branch
272 .open_or_create_root()
273 .await
274 .unwrap()
275 .create_file("test.txt".into())
276 .await
277 .unwrap();
278 file.write_all(b"foo").await.unwrap();
279 file.flush().await.unwrap();
280 drop(file);
281
282 let keys = branch.keys().clone().read_only();
283 let branch = branch.reopen(keys);
284
285 let mut file = branch
286 .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
287 .await
288 .unwrap()
289 .lookup("test.txt")
290 .unwrap()
291 .file()
292 .unwrap()
293 .open()
294 .await
295 .unwrap();
296
297 file.truncate(0).unwrap();
298 assert_matches!(file.flush().await, Err(Error::PermissionDenied));
299
300 file.write_all(b"bar").await.unwrap();
301 assert_matches!(file.flush().await, Err(Error::PermissionDenied));
302 }
303
304 async fn setup() -> (TempDir, Branch) {
305 let (base_dir, pool) = db::create_temp().await.unwrap();
306
307 let writer_id = PublicKey::random();
308 let secrets = WriteSecrets::random();
309 let event_tx = EventSender::new(1);
310
311 let store = Store::new(pool);
312 let shared = BranchShared::new();
313 let branch = Branch::new(writer_id, store, secrets.into(), shared, event_tx);
314
315 (base_dir, branch)
316 }
317}