1use crate::{
2 blob::{lock::UpgradableLock, Blob, BlockIds, ReadWriteError},
3 branch::Branch,
4 directory::{Directory, ParentContext},
5 error::{Error, Result},
6 protocol::{Bump, Locator, SingleBlockPresence, BLOCK_SIZE},
7 store::{Changeset, ReadTransaction},
8 version_vector::VersionVector,
9};
10use std::{fmt, io::SeekFrom};
11use tokio::io::{AsyncWrite, AsyncWriteExt};
12
13pub struct File {
14 blob: Blob,
15 parent: ParentContext,
16 lock: UpgradableLock,
17}
18
19impl File {
20 pub(crate) async fn open(
22 branch: Branch,
23 locator: Locator,
24 parent: ParentContext,
25 ) -> Result<Self> {
26 let lock = branch.locker().read(*locator.blob_id()).await;
27 let lock = UpgradableLock::Read(lock);
28
29 let mut tx = branch.store().begin_read().await?;
30
31 Ok(Self {
32 blob: Blob::open(&mut tx, branch, *locator.blob_id()).await?,
33 parent,
34 lock,
35 })
36 }
37
38 pub(crate) fn create(branch: Branch, locator: Locator, parent: ParentContext) -> Self {
40 let lock = branch
45 .locker()
46 .try_read(*locator.blob_id())
47 .ok()
48 .expect("blob_id collision");
49 let lock = UpgradableLock::Read(lock);
50
51 Self {
52 blob: Blob::create(branch, *locator.blob_id()),
53 parent,
54 lock,
55 }
56 }
57
58 pub fn branch(&self) -> &Branch {
59 self.blob.branch()
60 }
61
62 pub async fn parent(&self) -> Result<Directory> {
63 self.parent.open(self.branch().clone()).await
64 }
65
66 #[allow(clippy::len_without_is_empty)]
68 pub fn len(&self) -> u64 {
69 self.blob.len()
70 }
71
72 pub fn progress(&self) -> impl Future<Output = Result<u64>> + 'static {
77 let branch = self.branch().clone();
78 let blob_id = *self.blob.id();
79 let len = self.len();
80
81 async move {
82 let mut block_ids = BlockIds::open(branch, blob_id).await?;
83 let mut present = 0;
84
85 while let Some((_, block_presence)) = block_ids.try_next().await? {
86 match block_presence {
87 SingleBlockPresence::Present => {
88 present += 1;
89 }
90 SingleBlockPresence::Missing | SingleBlockPresence::Expired => (),
91 }
92 }
93
94 Ok((present as u64 * BLOCK_SIZE as u64).min(len))
95 }
96 }
97
98 pub async fn read(&mut self, buffer: &mut [u8]) -> Result<usize> {
100 loop {
101 match self.blob.read(buffer) {
102 Ok(len) => return Ok(len),
103 Err(ReadWriteError::CacheMiss) => {
104 let mut tx = self.branch().store().begin_read().await?;
105 self.blob.warmup(&mut tx).await?;
106 }
107 Err(ReadWriteError::CacheFull) => {
108 self.flush().await?;
109 }
110 }
111 }
112 }
113
114 pub async fn read_all(&mut self, buffer: &mut [u8]) -> Result<usize> {
115 let mut offset = 0;
116
117 loop {
118 match self.read(&mut buffer[offset..]).await? {
119 0 => return Ok(offset),
120 n => {
121 offset += n;
122 }
123 }
124 }
125 }
126
127 pub async fn read_to_end(&mut self) -> Result<Vec<u8>> {
130 let mut buffer = vec![
131 0;
132 (self.blob.len() - self.blob.seek_position())
133 .try_into()
134 .unwrap_or(usize::MAX)
135 ];
136 self.read_all(&mut buffer[..]).await?;
137 Ok(buffer)
138 }
139
140 pub async fn write(&mut self, buffer: &[u8]) -> Result<usize> {
142 self.acquire_write_lock()?;
143
144 loop {
145 match self.blob.write(buffer) {
146 Ok(len) => return Ok(len),
147 Err(ReadWriteError::CacheMiss) => {
148 let mut tx = self.branch().store().begin_read().await?;
149 self.blob.warmup(&mut tx).await?;
150 }
151 Err(ReadWriteError::CacheFull) => {
152 self.flush().await?;
153 }
154 }
155 }
156 }
157
158 pub async fn write_all(&mut self, buffer: &[u8]) -> Result<()> {
159 let mut offset = 0;
160
161 loop {
162 match self.write(&buffer[offset..]).await? {
163 0 => return Ok(()),
164 n => {
165 offset += n;
166 }
167 }
168 }
169 }
170
171 pub fn seek(&mut self, pos: SeekFrom) -> u64 {
173 self.blob.seek(pos)
174 }
175
176 pub fn truncate(&mut self, len: u64) -> Result<()> {
178 self.acquire_write_lock()?;
179 self.blob.truncate(len)
180 }
181
182 pub async fn flush(&mut self) -> Result<()> {
185 if !self.blob.is_dirty() {
186 return Ok(());
187 }
188
189 let mut tx = self.branch().store().begin_write().await?;
190 let mut changeset = Changeset::new();
191
192 self.blob.flush(&mut tx, &mut changeset).await?;
193 self.parent
194 .bump(
195 &mut tx,
196 &mut changeset,
197 self.branch().clone(),
198 Bump::increment(*self.branch().id()),
199 )
200 .await?;
201
202 changeset
203 .apply(
204 &mut tx,
205 self.branch().id(),
206 self.branch()
207 .keys()
208 .write()
209 .ok_or(Error::PermissionDenied)?,
210 )
211 .await?;
212
213 let event_tx = self.branch().notify();
214 tx.commit_and_then(move || event_tx.send()).await?;
215
216 Ok(())
217 }
218
219 pub(crate) async fn save(
222 &mut self,
223 tx: &mut ReadTransaction,
224 changeset: &mut Changeset,
225 ) -> Result<()> {
226 self.blob.flush(tx, changeset).await?;
227 Ok(())
228 }
229
230 pub async fn copy_to_writer<W: AsyncWrite + Unpin>(&mut self, dst: &mut W) -> Result<()> {
233 let mut buffer = vec![0; BLOCK_SIZE];
234
235 loop {
236 let len = self.read(&mut buffer).await?;
237
238 dst.write_all(&buffer[..len]).await.map_err(Error::Writer)?;
239
240 if len < buffer.len() {
241 break;
242 }
243 }
244
245 Ok(())
246 }
247
248 pub async fn fork(&mut self, dst_branch: Branch) -> Result<()> {
251 if self.branch().id() == dst_branch.id() {
252 return Ok(());
255 }
256
257 let parent = self.parent.fork(self.branch(), &dst_branch).await?;
258
259 let lock = dst_branch.locker().read(*self.blob.id()).await;
260 let lock = UpgradableLock::Read(lock);
261
262 let blob = {
263 let mut tx = dst_branch.store().begin_read().await?;
264 Blob::open(&mut tx, dst_branch, *self.blob.id()).await?
265 };
266
267 *self = Self { blob, parent, lock };
268
269 Ok(())
270 }
271
272 pub async fn version_vector(&self) -> Result<VersionVector> {
273 self.parent
274 .entry_version_vector(self.branch().clone())
275 .await
276 }
277
278 #[cfg(test)]
280 pub(crate) fn blob_id(&self) -> &crate::blob::BlobId {
281 self.blob.id()
282 }
283
284 fn acquire_write_lock(&mut self) -> Result<()> {
285 self.lock.upgrade().then_some(()).ok_or(Error::Locked)
286 }
287}
288
289impl fmt::Debug for File {
290 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
291 f.debug_struct("File")
292 .field("blob_id", &self.blob.id())
293 .field("branch", &self.blob.branch().id())
294 .finish()
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use crate::{
302 access_control::{AccessKeys, WriteSecrets},
303 branch::BranchShared,
304 crypto::sign::PublicKey,
305 db,
306 directory::{DirectoryFallback, DirectoryLocking},
307 event::EventSender,
308 store::Store,
309 test_utils,
310 };
311 use assert_matches::assert_matches;
312 use tempfile::TempDir;
313
314 #[tokio::test(flavor = "multi_thread")]
315 async fn fork() {
316 test_utils::init_log();
317 let (_base_dir, [branch0, branch1]) = setup().await;
318
319 let mut file0 = branch0.ensure_file_exists("dog.jpg".into()).await.unwrap();
321 file0.write_all(b"small").await.unwrap();
322 file0.flush().await.unwrap();
323 drop(file0);
324
325 let mut file1 = branch0
327 .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
328 .await
329 .unwrap()
330 .lookup("dog.jpg")
331 .unwrap()
332 .file()
333 .unwrap()
334 .open()
335 .await
336 .unwrap();
337
338 file1.fork(branch1.clone()).await.unwrap();
339 file1.write_all(b"large").await.unwrap();
340 file1.flush().await.unwrap();
341
342 let mut file = branch0
344 .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
345 .await
346 .unwrap()
347 .lookup("dog.jpg")
348 .unwrap()
349 .file()
350 .unwrap()
351 .open()
352 .await
353 .unwrap();
354
355 assert_eq!(file.read_to_end().await.unwrap(), b"small");
356
357 let mut file = branch1
359 .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
360 .await
361 .unwrap()
362 .lookup("dog.jpg")
363 .unwrap()
364 .file()
365 .unwrap()
366 .open()
367 .await
368 .unwrap();
369
370 assert_eq!(file.read_to_end().await.unwrap(), b"large");
371 }
372
373 #[tokio::test(flavor = "multi_thread")]
374 async fn multiple_consecutive_modifications_of_forked_file() {
375 let (_base_dir, [branch0, branch1]) = setup().await;
379
380 let mut file0 = branch0.ensure_file_exists("/pig.jpg".into()).await.unwrap();
381 file0.flush().await.unwrap();
382
383 let mut file1 = branch0
384 .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
385 .await
386 .unwrap()
387 .lookup("pig.jpg")
388 .unwrap()
389 .file()
390 .unwrap()
391 .open()
392 .await
393 .unwrap();
394
395 file1.fork(branch1).await.unwrap();
396
397 for _ in 0..2 {
398 file1.write_all(b"oink").await.unwrap();
399 file1.flush().await.unwrap();
400 }
401 }
402
403 #[tokio::test(flavor = "multi_thread")]
407 async fn concurrent_writes() {
408 let (_base_dir, [branch]) = setup().await;
409
410 let mut file0 = branch.ensure_file_exists("fox.txt".into()).await.unwrap();
411 let mut file1 = branch
412 .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
413 .await
414 .unwrap()
415 .lookup("fox.txt")
416 .unwrap()
417 .file()
418 .unwrap()
419 .open()
420 .await
421 .unwrap();
422
423 file0.write_all(b"yip-yap").await.unwrap();
424 assert_matches!(file1.write_all(b"ring-ding-ding").await, Err(Error::Locked));
425 assert_matches!(file1.truncate(0), Err(Error::Locked));
426 }
427
428 #[tokio::test(flavor = "multi_thread")]
429 async fn copy_to_writer() {
430 use tokio::{fs, io::AsyncReadExt};
431
432 let (base_dir, [branch]) = setup().await;
433 let src_content = b"hello world";
434
435 let mut src = branch.ensure_file_exists("src.txt".into()).await.unwrap();
436 src.write_all(src_content).await.unwrap();
437
438 let dst_path = base_dir.path().join("dst.txt");
439 let mut dst = fs::File::create(&dst_path).await.unwrap();
440 src.seek(SeekFrom::Start(0));
441 src.copy_to_writer(&mut dst).await.unwrap();
442 dst.sync_all().await.unwrap();
443 drop(dst);
444
445 let mut dst = fs::File::open(&dst_path).await.unwrap();
446 let mut dst_content = Vec::new();
447 dst.read_to_end(&mut dst_content).await.unwrap();
448
449 assert_eq!(dst_content, src_content);
450 }
451
452 async fn setup<const N: usize>() -> (TempDir, [Branch; N]) {
453 let (base_dir, pool) = db::create_temp().await.unwrap();
454 let store = Store::new(pool);
455 let keys = AccessKeys::from(WriteSecrets::random());
456 let event_tx = EventSender::new(1);
457 let shared = BranchShared::new();
458
459 let branches = [(); N].map(|_| {
460 create_branch(
461 store.clone(),
462 event_tx.clone(),
463 keys.clone(),
464 shared.clone(),
465 )
466 });
467
468 (base_dir, branches)
469 }
470
471 fn create_branch(
472 store: Store,
473 event_tx: EventSender,
474 keys: AccessKeys,
475 shared: BranchShared,
476 ) -> Branch {
477 let id = PublicKey::random();
478 Branch::new(id, store, keys, shared, event_tx)
479 }
480}