ouisync/file/
mod.rs

1use crate::{
2    blob::{lock::UpgradableLock, Blob, BlockIds, ReadWriteError},
3    branch::Branch,
4    directory::{Directory, ParentContext},
5    error::{Error, Result},
6    protocol::{Bump, Locator, SingleBlockPresence, BLOCK_SIZE},
7    store::{Changeset, ReadTransaction},
8    version_vector::VersionVector,
9};
10use std::{fmt, io::SeekFrom};
11use tokio::io::{AsyncWrite, AsyncWriteExt};
12
13pub struct File {
14    blob: Blob,
15    parent: ParentContext,
16    lock: UpgradableLock,
17}
18
19impl File {
20    /// Opens an existing file.
21    pub(crate) async fn open(
22        branch: Branch,
23        locator: Locator,
24        parent: ParentContext,
25    ) -> Result<Self> {
26        let lock = branch.locker().read(*locator.blob_id()).await;
27        let lock = UpgradableLock::Read(lock);
28
29        let mut tx = branch.store().begin_read().await?;
30
31        Ok(Self {
32            blob: Blob::open(&mut tx, branch, *locator.blob_id()).await?,
33            parent,
34            lock,
35        })
36    }
37
38    /// Creates a new file.
39    pub(crate) fn create(branch: Branch, locator: Locator, parent: ParentContext) -> Self {
40        // The only way this could fail is if there is already another locked blob with the same id
41        // in the same branch. But the blob id is randomly generated so this would imply we
42        // generated the same id more than once which is so astronomically unlikely that we might
43        // as well panic.
44        let lock = branch
45            .locker()
46            .try_read(*locator.blob_id())
47            .ok()
48            .expect("blob_id collision");
49        let lock = UpgradableLock::Read(lock);
50
51        Self {
52            blob: Blob::create(branch, *locator.blob_id()),
53            parent,
54            lock,
55        }
56    }
57
58    pub fn branch(&self) -> &Branch {
59        self.blob.branch()
60    }
61
62    pub async fn parent(&self) -> Result<Directory> {
63        self.parent.open(self.branch().clone()).await
64    }
65
66    /// Length of this file in bytes.
67    #[allow(clippy::len_without_is_empty)]
68    pub fn len(&self) -> u64 {
69        self.blob.len()
70    }
71
72    /// Sync progress of this file, that is, what part of this file (in bytes) is available locally.
73    /// NOTE: The future returned from this function doesn't borrow from `self` so it's possible
74    /// to drop the `self` before/while awaiting it. This is useful to avoid keeping the file lock
75    /// while awaiting the result.
76    pub fn progress(&self) -> impl Future<Output = Result<u64>> + 'static {
77        let branch = self.branch().clone();
78        let blob_id = *self.blob.id();
79        let len = self.len();
80
81        async move {
82            let mut block_ids = BlockIds::open(branch, blob_id).await?;
83            let mut present = 0;
84
85            while let Some((_, block_presence)) = block_ids.try_next().await? {
86                match block_presence {
87                    SingleBlockPresence::Present => {
88                        present += 1;
89                    }
90                    SingleBlockPresence::Missing | SingleBlockPresence::Expired => (),
91                }
92            }
93
94            Ok((present as u64 * BLOCK_SIZE as u64).min(len))
95        }
96    }
97
98    /// Reads data from this file. Returns the number of bytes actually read.
99    pub async fn read(&mut self, buffer: &mut [u8]) -> Result<usize> {
100        loop {
101            match self.blob.read(buffer) {
102                Ok(len) => return Ok(len),
103                Err(ReadWriteError::CacheMiss) => {
104                    let mut tx = self.branch().store().begin_read().await?;
105                    self.blob.warmup(&mut tx).await?;
106                }
107                Err(ReadWriteError::CacheFull) => {
108                    self.flush().await?;
109                }
110            }
111        }
112    }
113
114    pub async fn read_all(&mut self, buffer: &mut [u8]) -> Result<usize> {
115        let mut offset = 0;
116
117        loop {
118            match self.read(&mut buffer[offset..]).await? {
119                0 => return Ok(offset),
120                n => {
121                    offset += n;
122                }
123            }
124        }
125    }
126
127    /// Read all data from this file from the current seek position until the end and return then
128    /// in a `Vec`.
129    pub async fn read_to_end(&mut self) -> Result<Vec<u8>> {
130        let mut buffer = vec![
131            0;
132            (self.blob.len() - self.blob.seek_position())
133                .try_into()
134                .unwrap_or(usize::MAX)
135        ];
136        self.read_all(&mut buffer[..]).await?;
137        Ok(buffer)
138    }
139
140    /// Writes `buffer` into this file. Returns the number of bytes actually written.
141    pub async fn write(&mut self, buffer: &[u8]) -> Result<usize> {
142        self.acquire_write_lock()?;
143
144        loop {
145            match self.blob.write(buffer) {
146                Ok(len) => return Ok(len),
147                Err(ReadWriteError::CacheMiss) => {
148                    let mut tx = self.branch().store().begin_read().await?;
149                    self.blob.warmup(&mut tx).await?;
150                }
151                Err(ReadWriteError::CacheFull) => {
152                    self.flush().await?;
153                }
154            }
155        }
156    }
157
158    pub async fn write_all(&mut self, buffer: &[u8]) -> Result<()> {
159        let mut offset = 0;
160
161        loop {
162            match self.write(&buffer[offset..]).await? {
163                0 => return Ok(()),
164                n => {
165                    offset += n;
166                }
167            }
168        }
169    }
170
171    /// Seeks to an offset in the file.
172    pub fn seek(&mut self, pos: SeekFrom) -> u64 {
173        self.blob.seek(pos)
174    }
175
176    /// Truncates the file to the given length.
177    pub fn truncate(&mut self, len: u64) -> Result<()> {
178        self.acquire_write_lock()?;
179        self.blob.truncate(len)
180    }
181
182    /// Atomically saves any pending modifications and updates the version vectors of this file and
183    /// all its ancestors.
184    pub async fn flush(&mut self) -> Result<()> {
185        if !self.blob.is_dirty() {
186            return Ok(());
187        }
188
189        let mut tx = self.branch().store().begin_write().await?;
190        let mut changeset = Changeset::new();
191
192        self.blob.flush(&mut tx, &mut changeset).await?;
193        self.parent
194            .bump(
195                &mut tx,
196                &mut changeset,
197                self.branch().clone(),
198                Bump::increment(*self.branch().id()),
199            )
200            .await?;
201
202        changeset
203            .apply(
204                &mut tx,
205                self.branch().id(),
206                self.branch()
207                    .keys()
208                    .write()
209                    .ok_or(Error::PermissionDenied)?,
210            )
211            .await?;
212
213        let event_tx = self.branch().notify();
214        tx.commit_and_then(move || event_tx.send()).await?;
215
216        Ok(())
217    }
218
219    /// Saves any pending modifications but does not update the version vectors. For internal use
220    /// only.
221    pub(crate) async fn save(
222        &mut self,
223        tx: &mut ReadTransaction,
224        changeset: &mut Changeset,
225    ) -> Result<()> {
226        self.blob.flush(tx, changeset).await?;
227        Ok(())
228    }
229
230    /// Copy the entire contents of this file into the provided writer (e.g. a file on a regular
231    /// filesystem)
232    pub async fn copy_to_writer<W: AsyncWrite + Unpin>(&mut self, dst: &mut W) -> Result<()> {
233        let mut buffer = vec![0; BLOCK_SIZE];
234
235        loop {
236            let len = self.read(&mut buffer).await?;
237
238            dst.write_all(&buffer[..len]).await.map_err(Error::Writer)?;
239
240            if len < buffer.len() {
241                break;
242            }
243        }
244
245        Ok(())
246    }
247
248    /// Forks this file into the given branch. Ensure all its ancestor directories exist and live
249    /// in the branch as well. Should be called before any mutable operation.
250    pub async fn fork(&mut self, dst_branch: Branch) -> Result<()> {
251        if self.branch().id() == dst_branch.id() {
252            // File already lives in the local branch. We assume the ancestor directories have been
253            // already created as well so there is nothing else to do.
254            return Ok(());
255        }
256
257        let parent = self.parent.fork(self.branch(), &dst_branch).await?;
258
259        let lock = dst_branch.locker().read(*self.blob.id()).await;
260        let lock = UpgradableLock::Read(lock);
261
262        let blob = {
263            let mut tx = dst_branch.store().begin_read().await?;
264            Blob::open(&mut tx, dst_branch, *self.blob.id()).await?
265        };
266
267        *self = Self { blob, parent, lock };
268
269        Ok(())
270    }
271
272    pub async fn version_vector(&self) -> Result<VersionVector> {
273        self.parent
274            .entry_version_vector(self.branch().clone())
275            .await
276    }
277
278    /// BlobId of this file.
279    #[cfg(test)]
280    pub(crate) fn blob_id(&self) -> &crate::blob::BlobId {
281        self.blob.id()
282    }
283
284    fn acquire_write_lock(&mut self) -> Result<()> {
285        self.lock.upgrade().then_some(()).ok_or(Error::Locked)
286    }
287}
288
289impl fmt::Debug for File {
290    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
291        f.debug_struct("File")
292            .field("blob_id", &self.blob.id())
293            .field("branch", &self.blob.branch().id())
294            .finish()
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use crate::{
302        access_control::{AccessKeys, WriteSecrets},
303        branch::BranchShared,
304        crypto::sign::PublicKey,
305        db,
306        directory::{DirectoryFallback, DirectoryLocking},
307        event::EventSender,
308        store::Store,
309        test_utils,
310    };
311    use assert_matches::assert_matches;
312    use tempfile::TempDir;
313
314    #[tokio::test(flavor = "multi_thread")]
315    async fn fork() {
316        test_utils::init_log();
317        let (_base_dir, [branch0, branch1]) = setup().await;
318
319        // Create a file owned by branch 0
320        let mut file0 = branch0.ensure_file_exists("dog.jpg".into()).await.unwrap();
321        file0.write_all(b"small").await.unwrap();
322        file0.flush().await.unwrap();
323        drop(file0);
324
325        // Open the file, fork it into branch 1 and modify it.
326        let mut file1 = branch0
327            .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
328            .await
329            .unwrap()
330            .lookup("dog.jpg")
331            .unwrap()
332            .file()
333            .unwrap()
334            .open()
335            .await
336            .unwrap();
337
338        file1.fork(branch1.clone()).await.unwrap();
339        file1.write_all(b"large").await.unwrap();
340        file1.flush().await.unwrap();
341
342        // Reopen orig file and verify it's unchanged
343        let mut file = branch0
344            .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
345            .await
346            .unwrap()
347            .lookup("dog.jpg")
348            .unwrap()
349            .file()
350            .unwrap()
351            .open()
352            .await
353            .unwrap();
354
355        assert_eq!(file.read_to_end().await.unwrap(), b"small");
356
357        // Reopen forked file and verify it's modified
358        let mut file = branch1
359            .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
360            .await
361            .unwrap()
362            .lookup("dog.jpg")
363            .unwrap()
364            .file()
365            .unwrap()
366            .open()
367            .await
368            .unwrap();
369
370        assert_eq!(file.read_to_end().await.unwrap(), b"large");
371    }
372
373    #[tokio::test(flavor = "multi_thread")]
374    async fn multiple_consecutive_modifications_of_forked_file() {
375        // This test makes sure that modifying a forked file properly updates the file metadata so
376        // subsequent modifications work correclty.
377
378        let (_base_dir, [branch0, branch1]) = setup().await;
379
380        let mut file0 = branch0.ensure_file_exists("/pig.jpg".into()).await.unwrap();
381        file0.flush().await.unwrap();
382
383        let mut file1 = branch0
384            .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
385            .await
386            .unwrap()
387            .lookup("pig.jpg")
388            .unwrap()
389            .file()
390            .unwrap()
391            .open()
392            .await
393            .unwrap();
394
395        file1.fork(branch1).await.unwrap();
396
397        for _ in 0..2 {
398            file1.write_all(b"oink").await.unwrap();
399            file1.flush().await.unwrap();
400        }
401    }
402
403    // TODO: currently concurrent writes are not allowed and this tests simply asserts that. When
404    // concurrent writes are implemented, we should remove this test and replace it with a series
405    // of tests for various write concurrency cases.
406    #[tokio::test(flavor = "multi_thread")]
407    async fn concurrent_writes() {
408        let (_base_dir, [branch]) = setup().await;
409
410        let mut file0 = branch.ensure_file_exists("fox.txt".into()).await.unwrap();
411        let mut file1 = branch
412            .open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
413            .await
414            .unwrap()
415            .lookup("fox.txt")
416            .unwrap()
417            .file()
418            .unwrap()
419            .open()
420            .await
421            .unwrap();
422
423        file0.write_all(b"yip-yap").await.unwrap();
424        assert_matches!(file1.write_all(b"ring-ding-ding").await, Err(Error::Locked));
425        assert_matches!(file1.truncate(0), Err(Error::Locked));
426    }
427
428    #[tokio::test(flavor = "multi_thread")]
429    async fn copy_to_writer() {
430        use tokio::{fs, io::AsyncReadExt};
431
432        let (base_dir, [branch]) = setup().await;
433        let src_content = b"hello world";
434
435        let mut src = branch.ensure_file_exists("src.txt".into()).await.unwrap();
436        src.write_all(src_content).await.unwrap();
437
438        let dst_path = base_dir.path().join("dst.txt");
439        let mut dst = fs::File::create(&dst_path).await.unwrap();
440        src.seek(SeekFrom::Start(0));
441        src.copy_to_writer(&mut dst).await.unwrap();
442        dst.sync_all().await.unwrap();
443        drop(dst);
444
445        let mut dst = fs::File::open(&dst_path).await.unwrap();
446        let mut dst_content = Vec::new();
447        dst.read_to_end(&mut dst_content).await.unwrap();
448
449        assert_eq!(dst_content, src_content);
450    }
451
452    async fn setup<const N: usize>() -> (TempDir, [Branch; N]) {
453        let (base_dir, pool) = db::create_temp().await.unwrap();
454        let store = Store::new(pool);
455        let keys = AccessKeys::from(WriteSecrets::random());
456        let event_tx = EventSender::new(1);
457        let shared = BranchShared::new();
458
459        let branches = [(); N].map(|_| {
460            create_branch(
461                store.clone(),
462                event_tx.clone(),
463                keys.clone(),
464                shared.clone(),
465            )
466        });
467
468        (base_dir, branches)
469    }
470
471    fn create_branch(
472        store: Store,
473        event_tx: EventSender,
474        keys: AccessKeys,
475        shared: BranchShared,
476    ) -> Branch {
477        let id = PublicKey::random();
478        Branch::new(id, store, keys, shared, event_tx)
479    }
480}