ouisync/blob/
mod.rs

1pub(crate) mod lock;
2
3mod block_ids;
4mod id;
5mod position;
6
7#[cfg(test)]
8mod tests;
9
10pub(crate) use self::{block_ids::BlockIds, id::BlobId};
11
12use self::position::Position;
13use crate::{
14    branch::Branch,
15    collections::HashMap,
16    crypto::{
17        cipher::{self, Nonce, SecretKey},
18        sign::{Keypair, PublicKey},
19        Hashable,
20    },
21    error::{Error, Result},
22    protocol::{
23        Block, BlockContent, BlockId, BlockNonce, Locator, RootNode, RootNodeFilter,
24        SingleBlockPresence, BLOCK_SIZE,
25    },
26    store::{self, Changeset, ReadTransaction},
27};
28use std::{collections::hash_map::Entry, io::SeekFrom, iter, mem};
29use thiserror::Error;
30
31/// Size of the blob header in bytes.
32// Using u64 instead of usize because HEADER_SIZE must be the same irrespective of whether we're on
33// a 32bit or 64bit processor (if we want two such replicas to be able to sync).
34pub const HEADER_SIZE: usize = mem::size_of::<u64>();
35
36// Max number of blocks processed in a single batch (write transaction). Increasing this number
37// decreases the number of flushes needed during writes but increases the coplexity of the
38// individual flushes.
39pub const BATCH_SIZE: usize = 2048; // 64 MiB
40
41#[derive(Debug, Error)]
42pub(crate) enum ReadWriteError {
43    #[error("block not found in the cache")]
44    CacheMiss,
45    #[error("cache is full")]
46    CacheFull,
47}
48
49pub(crate) struct Blob {
50    branch: Branch,
51    id: BlobId,
52    cache: HashMap<u32, CachedBlock>,
53    len_original: u64,
54    len_modified: u64,
55    position: Position,
56}
57
58impl Blob {
59    /// Opens an existing blob.
60    pub async fn open(tx: &mut ReadTransaction, branch: Branch, id: BlobId) -> Result<Self> {
61        let root_node = tx
62            .load_latest_approved_root_node(branch.id(), RootNodeFilter::Any)
63            .await?;
64        Self::open_at(tx, &root_node, branch, id).await
65    }
66
67    pub async fn open_at(
68        tx: &mut ReadTransaction,
69        root_node: &RootNode,
70        branch: Branch,
71        id: BlobId,
72    ) -> Result<Self> {
73        assert_eq!(root_node.proof.writer_id, *branch.id());
74
75        let (_, buffer) =
76            read_block(tx, root_node, &Locator::head(id), branch.keys().read()).await?;
77
78        let len = buffer.read_u64(0);
79        let cached_block = CachedBlock::from(buffer);
80        let cache = iter::once((0, cached_block)).collect();
81        let position = Position::ZERO;
82
83        Ok(Self {
84            branch,
85            id,
86            cache,
87            len_original: len,
88            len_modified: len,
89            position,
90        })
91    }
92
93    /// Creates a new blob.
94    pub fn create(branch: Branch, id: BlobId) -> Self {
95        let cached_block = CachedBlock::new().with_dirty(true);
96        let cache = iter::once((0, cached_block)).collect();
97
98        Self {
99            branch,
100            id,
101            cache,
102            len_original: 0,
103            len_modified: 0,
104            position: Position::ZERO,
105        }
106    }
107
108    pub fn branch(&self) -> &Branch {
109        &self.branch
110    }
111
112    /// Id of this blob.
113    pub fn id(&self) -> &BlobId {
114        &self.id
115    }
116
117    /// Length of this blob in bytes.
118    pub fn len(&self) -> u64 {
119        self.len_modified
120    }
121
122    // Returns the current seek position from the start of the blob.
123    pub fn seek_position(&self) -> u64 {
124        self.position.get()
125    }
126
127    pub fn block_count(&self) -> u32 {
128        block_count(self.len())
129    }
130
131    /// Was this blob modified and not flushed yet?
132    pub fn is_dirty(&self) -> bool {
133        self.cache.values().any(|block| block.dirty) || self.len_modified != self.len_original
134    }
135
136    /// Seek to an offset in the blob.
137    ///
138    /// It is allowed to specify offset that is outside of the range of the blob but such offset
139    /// will be clamped to be within the range.
140    ///
141    /// Returns the new seek position from the start of the blob.
142    pub fn seek(&mut self, pos: SeekFrom) -> u64 {
143        let position = match pos {
144            SeekFrom::Start(n) => n.min(self.len()),
145            SeekFrom::End(n) => {
146                if n >= 0 {
147                    self.len()
148                } else {
149                    self.len().saturating_sub((-n) as u64)
150                }
151            }
152            SeekFrom::Current(n) => {
153                if n >= 0 {
154                    self.seek_position()
155                        .saturating_add(n as u64)
156                        .min(self.len())
157                } else {
158                    self.seek_position().saturating_sub((-n) as u64)
159                }
160            }
161        };
162
163        self.position.set(position);
164
165        position
166    }
167
168    /// Reads data from this blob into `buffer`, advancing the internal cursor. Returns the
169    /// number of bytes actually read which might be less than `buffer.len()`.
170    pub fn read(&mut self, buffer: &mut [u8]) -> Result<usize, ReadWriteError> {
171        if self.position.get() >= self.len() {
172            return Ok(0);
173        }
174
175        let block = match self.cache.get(&self.position.block) {
176            Some(block) => block,
177            None => {
178                if self.check_cache_capacity() {
179                    return Err(ReadWriteError::CacheMiss);
180                } else {
181                    return Err(ReadWriteError::CacheFull);
182                }
183            }
184        };
185
186        // minimum of:
187        // - buffer length
188        // - remaining size of the current block
189        // - remaining size of the whole blob
190        let read_len = buffer
191            .len()
192            .min(block.content.len() - self.position.offset)
193            .min(self.len() as usize - self.position.get() as usize);
194
195        block
196            .content
197            .read(self.position.offset, &mut buffer[..read_len]);
198
199        self.position.advance(read_len);
200
201        Ok(read_len)
202    }
203
204    #[cfg(test)]
205    pub async fn read_all(&mut self, tx: &mut ReadTransaction, buffer: &mut [u8]) -> Result<usize> {
206        let root_node = tx
207            .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
208            .await?;
209        self.read_all_at(tx, &root_node, buffer).await
210    }
211
212    pub async fn read_all_at(
213        &mut self,
214        tx: &mut ReadTransaction,
215        root_node: &RootNode,
216        buffer: &mut [u8],
217    ) -> Result<usize> {
218        assert_eq!(root_node.proof.writer_id, *self.branch.id());
219
220        let mut offset = 0;
221
222        loop {
223            match self.read(&mut buffer[offset..]) {
224                Ok(0) => break,
225                Ok(len) => {
226                    offset += len;
227                }
228                Err(ReadWriteError::CacheMiss) => self.warmup_at(tx, root_node).await?,
229                Err(ReadWriteError::CacheFull) => {
230                    tracing::error!("cache full");
231                    return Err(Error::OperationNotSupported);
232                }
233            }
234        }
235
236        Ok(offset)
237    }
238
239    /// Read all data from this blob from the current seek position until the end and return then
240    /// in a `Vec`.
241    #[cfg(test)]
242    pub async fn read_to_end(&mut self, tx: &mut ReadTransaction) -> Result<Vec<u8>> {
243        let root_node = tx
244            .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
245            .await?;
246        self.read_to_end_at(tx, &root_node).await
247    }
248
249    /// Read all data from this blob at the given snapshot from the current seek position until the
250    /// end and return then in a `Vec`.
251    pub async fn read_to_end_at(
252        &mut self,
253        tx: &mut ReadTransaction,
254        root_node: &RootNode,
255    ) -> Result<Vec<u8>> {
256        // Allocate the buffer `CHUNK_SIZE` bytes at a time to prevent blowing up the memory in
257        // case the blob header was tampered with.
258        const CHUNK_SIZE: usize = 1024 * 1024;
259
260        let total = (self.len() - self.seek_position())
261            .try_into()
262            .unwrap_or(usize::MAX);
263
264        let mut buffer = Vec::new();
265        let mut offset = 0;
266
267        while offset < total {
268            buffer.resize(buffer.len() + CHUNK_SIZE.min(total - offset), 0);
269            offset += self
270                .read_all_at(tx, root_node, &mut buffer[offset..])
271                .await?;
272        }
273
274        Ok(buffer)
275    }
276
277    pub fn write(&mut self, buffer: &[u8]) -> Result<usize, ReadWriteError> {
278        if buffer.is_empty() {
279            return Ok(0);
280        }
281
282        let block = match self.cache.get_mut(&self.position.block) {
283            Some(block) => block,
284            None => {
285                if !self.check_cache_capacity() {
286                    return Err(ReadWriteError::CacheFull);
287                }
288
289                if self.position.get() >= self.len_modified
290                    || self.position.offset == 0 && buffer.len() >= BLOCK_SIZE
291                {
292                    self.cache.entry(self.position.block).or_default()
293                } else {
294                    return Err(ReadWriteError::CacheMiss);
295                }
296            }
297        };
298
299        let write_len = buffer.len().min(block.content.len() - self.position.offset);
300
301        block
302            .content
303            .write(self.position.offset, &buffer[..write_len]);
304        block.dirty = true;
305
306        self.position.advance(write_len);
307        self.len_modified = self.len_modified.max(self.position.get());
308
309        Ok(write_len)
310    }
311
312    pub async fn write_all(
313        &mut self,
314        tx: &mut ReadTransaction,
315        changeset: &mut Changeset,
316        buffer: &[u8],
317    ) -> Result<()> {
318        let mut offset = 0;
319
320        loop {
321            match self.write(&buffer[offset..]) {
322                Ok(0) => break,
323                Ok(len) => {
324                    offset += len;
325                }
326                Err(ReadWriteError::CacheMiss) => {
327                    self.warmup(tx).await?;
328                }
329                Err(ReadWriteError::CacheFull) => {
330                    self.flush(tx, changeset).await?;
331                }
332            }
333        }
334
335        Ok(())
336    }
337
338    /// Load the current block into the cache.
339    pub async fn warmup(&mut self, tx: &mut ReadTransaction) -> Result<()> {
340        let root_node = tx
341            .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
342            .await?;
343        self.warmup_at(tx, &root_node).await?;
344
345        Ok(())
346    }
347
348    /// Load the current block at the given snapshot into the cache.
349    pub async fn warmup_at(
350        &mut self,
351        tx: &mut ReadTransaction,
352        root_node: &RootNode,
353    ) -> Result<()> {
354        match self.cache.entry(self.position.block) {
355            Entry::Occupied(_) => (),
356            Entry::Vacant(entry) => {
357                let locator = Locator::head(self.id).nth(self.position.block);
358                let (_, buffer) =
359                    read_block(tx, root_node, &locator, self.branch.keys().read()).await?;
360                entry.insert(CachedBlock::from(buffer));
361            }
362        }
363
364        Ok(())
365    }
366
367    /// Truncate the blob to the given length.
368    pub fn truncate(&mut self, len: u64) -> Result<()> {
369        if len == self.len() {
370            return Ok(());
371        }
372
373        if len > self.len() {
374            // TODO: consider supporting this
375            return Err(Error::OperationNotSupported);
376        }
377
378        if self.seek_position() > len {
379            self.seek(SeekFrom::Start(len));
380        }
381
382        self.len_modified = len;
383
384        Ok(())
385    }
386
387    /// Flushes this blob, ensuring that all intermediately buffered contents gets written to the
388    /// store.
389    pub(crate) async fn flush(
390        &mut self,
391        tx: &mut ReadTransaction,
392        changeset: &mut Changeset,
393    ) -> Result<()> {
394        self.write_len(tx, changeset).await?;
395        self.write_blocks(changeset);
396
397        Ok(())
398    }
399
400    /// Remove this blob from the store.
401    pub(crate) fn remove(self, changeset: &mut Changeset) {
402        let locators = Locator::head(self.id)
403            .sequence()
404            .take(self.block_count() as usize);
405
406        for locator in locators {
407            let encoded = locator.encode(self.branch().keys().read());
408            changeset.unlink_block(encoded, None);
409        }
410    }
411
412    fn check_cache_capacity(&mut self) -> bool {
413        if self.cache.len() < BATCH_SIZE {
414            return true;
415        }
416
417        let number = self
418            .cache
419            .iter()
420            .find(|(_, block)| !block.dirty)
421            .map(|(number, _)| *number);
422
423        if let Some(number) = number {
424            self.cache.remove(&number);
425            true
426        } else {
427            false
428        }
429    }
430
431    // Write length, if changed
432    async fn write_len(
433        &mut self,
434        tx: &mut ReadTransaction,
435        changeset: &mut Changeset,
436    ) -> Result<()> {
437        if self.len_modified == self.len_original {
438            return Ok(());
439        }
440
441        if let Some(block) = self.cache.get_mut(&0) {
442            block.content.write_u64(0, self.len_modified);
443            block.dirty = true;
444        } else {
445            let locator = Locator::head(self.id);
446            let root_node = tx
447                .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
448                .await?;
449            let (_, mut content) =
450                read_block(tx, &root_node, &locator, self.branch.keys().read()).await?;
451            content.write_u64(0, self.len_modified);
452            write_block(changeset, &locator, content, self.branch.keys().read());
453        }
454
455        self.len_original = self.len_modified;
456
457        Ok(())
458    }
459
460    fn write_blocks(&mut self, changeset: &mut Changeset) {
461        // Poor man's `drain_filter`.
462        let cache = mem::take(&mut self.cache);
463        let (dirty, clean): (HashMap<_, _>, _) =
464            cache.into_iter().partition(|(_, block)| block.dirty);
465        self.cache = clean;
466
467        for (number, block) in dirty {
468            let locator = Locator::head(self.id).nth(number);
469            write_block(
470                changeset,
471                &locator,
472                block.content,
473                self.branch.keys().read(),
474            );
475        }
476    }
477}
478
479// NOTE: Clone only creates a new instance of the same blob. It doesn't preserve dirtiness.
480impl Clone for Blob {
481    fn clone(&self) -> Self {
482        Self {
483            branch: self.branch.clone(),
484            id: self.id,
485            cache: HashMap::default(),
486            len_original: self.len_original,
487            len_modified: self.len_original,
488            position: self.position,
489        }
490    }
491}
492
493#[derive(Default)]
494struct CachedBlock {
495    content: BlockContent,
496    dirty: bool,
497}
498
499impl CachedBlock {
500    fn new() -> Self {
501        Self::default()
502    }
503
504    fn with_dirty(self, dirty: bool) -> Self {
505        Self { dirty, ..self }
506    }
507}
508
509impl From<BlockContent> for CachedBlock {
510    fn from(content: BlockContent) -> Self {
511        Self {
512            content,
513            dirty: false,
514        }
515    }
516}
517
518/// Creates a shallow copy (only the index nodes are copied, not blocks) of the specified blob into
519/// the specified destination branch.
520///
521/// NOTE: This function is not atomic. However, it is idempotent, so in case it's interrupted, it
522/// can be safely retried.
523pub(crate) async fn fork(blob_id: BlobId, src_branch: &Branch, dst_branch: &Branch) -> Result<()> {
524    // If the blob is already forked, do nothing but still return Ok to maintain idempotency.
525    if src_branch.id() == dst_branch.id() {
526        return Ok(());
527    }
528
529    let read_key = src_branch.keys().read();
530    // Take the write key from the dst branch, not the src branch, to protect us against
531    // accidentally forking into remote branch (remote branches don't have write access).
532    let write_keys = dst_branch.keys().write().ok_or(Error::PermissionDenied)?;
533
534    // FIXME: The src blob can change in the middle of the fork which could cause the dst blob to
535    // become corrupted (part of it will be forked pre-change and part post-change). To prevent
536    // that, we should restart the fork every time the src branch changes, or - better - run the
537    // whole fork in a single transaction (but somehow avoid blocking other tasks).
538
539    let end = {
540        let mut tx = src_branch.store().begin_read().await?;
541        let root_node = tx
542            .load_latest_approved_root_node(src_branch.id(), RootNodeFilter::Any)
543            .await?;
544        load_block_count_hint(&mut tx, &root_node, blob_id, src_branch.keys().read()).await?
545    };
546
547    struct Batch {
548        tx: crate::store::WriteTransaction,
549        changeset: Changeset,
550    }
551
552    impl Batch {
553        async fn apply(self, dst_branch_id: &PublicKey, write_keys: &Keypair) -> Result<()> {
554            let Batch { mut tx, changeset } = self;
555            changeset.apply(&mut tx, dst_branch_id, write_keys).await?;
556            tx.commit().await?;
557            Ok(())
558        }
559    }
560
561    let mut batch = None;
562    let locators = Locator::head(blob_id).sequence().take(end as usize);
563
564    for locator in locators {
565        let Batch { tx, changeset } = if let Some(batch) = &mut batch {
566            batch
567        } else {
568            batch.insert(Batch {
569                tx: src_branch.store().begin_write().await?,
570                changeset: Changeset::new(),
571            })
572        };
573
574        let encoded_locator = locator.encode(read_key);
575
576        let (block_id, block_presence) =
577            match tx.find_block(src_branch.id(), &encoded_locator).await {
578                Ok(id) => id,
579                Err(store::Error::LocatorNotFound) => {
580                    // end of the blob
581                    break;
582                }
583                Err(error) => return Err(error.into()),
584            };
585
586        changeset.link_block(encoded_locator, block_id, block_presence);
587
588        // The `+ 1` is there to not hit on the first run.
589        if (locator.number() + 1) as usize % BATCH_SIZE == 0 {
590            if let Some(batch) = batch.take() {
591                batch.apply(dst_branch.id(), write_keys).await?;
592            }
593        }
594
595        tracing::trace!(
596            num = locator.number(),
597            block_id = ?block_id,
598            ?block_presence,
599            "fork block",
600        );
601    }
602
603    if let Some(batch) = batch {
604        batch.apply(dst_branch.id(), write_keys).await?;
605    }
606
607    Ok(())
608}
609
610fn block_count(len: u64) -> u32 {
611    // https://stackoverflow.com/questions/2745074/fast-ceiling-of-an-integer-division-in-c-c
612    (1 + (len + HEADER_SIZE as u64 - 1) / BLOCK_SIZE as u64)
613        .try_into()
614        .unwrap_or(u32::MAX)
615}
616
617async fn read_len(
618    tx: &mut ReadTransaction,
619    root_node: &RootNode,
620    blob_id: BlobId,
621    read_key: &cipher::SecretKey,
622) -> Result<u64> {
623    let (_, buffer) = read_block(tx, root_node, &Locator::head(blob_id), read_key).await?;
624    Ok(buffer.read_u64(0))
625}
626
627// Returns the max number of blocks the specified blob has. This either returns the actual number
628// or `u32::MAX` in case the first blob is not available and so the blob length can't be obtained.
629async fn load_block_count_hint(
630    tx: &mut ReadTransaction,
631    root_node: &RootNode,
632    blob_id: BlobId,
633    read_key: &cipher::SecretKey,
634) -> Result<u32> {
635    match read_len(tx, root_node, blob_id, read_key).await {
636        Ok(len) => Ok(block_count(len)),
637        Err(Error::Store(store::Error::BlockNotFound)) => Ok(u32::MAX),
638        Err(error) => Err(error),
639    }
640}
641
642async fn read_block(
643    tx: &mut ReadTransaction,
644    root_node: &RootNode,
645    locator: &Locator,
646    read_key: &cipher::SecretKey,
647) -> Result<(BlockId, BlockContent)> {
648    let (block_id, _) = tx
649        .find_block_at(root_node, &locator.encode(read_key))
650        .await
651        .inspect_err(|error| {
652            tracing::trace!(?error, root_hash = ?root_node.proof.hash, ?locator);
653        })?;
654
655    let mut content = BlockContent::new();
656    let nonce = tx
657        .read_block(&block_id, &mut content)
658        .await
659        .inspect_err(|error| {
660            tracing::trace!(?error, root_hash = ?root_node.proof.hash, ?locator, ?block_id);
661        })?;
662
663    decrypt_block(read_key, &nonce, &mut content);
664
665    Ok((block_id, content))
666}
667
668fn write_block(
669    changeset: &mut Changeset,
670    locator: &Locator,
671    mut content: BlockContent,
672    read_key: &cipher::SecretKey,
673) -> BlockId {
674    let nonce = make_block_nonce(locator, &content, read_key);
675    encrypt_block(read_key, &nonce, &mut content);
676
677    let block = Block::new(content, nonce);
678    let block_id = block.id;
679
680    changeset.link_block(
681        locator.encode(read_key),
682        block.id,
683        SingleBlockPresence::Present,
684    );
685    changeset.write_block(block);
686
687    tracing::trace!(?locator, ?block_id, "write block");
688
689    block_id
690}
691
692fn decrypt_block(blob_key: &cipher::SecretKey, block_nonce: &BlockNonce, content: &mut [u8]) {
693    let block_key = SecretKey::derive_from_key(blob_key.as_array(), block_nonce);
694    block_key.decrypt_no_aead(&Nonce::default(), content);
695}
696
697fn encrypt_block(blob_key: &cipher::SecretKey, block_nonce: &BlockNonce, content: &mut [u8]) {
698    let block_key = SecretKey::derive_from_key(blob_key.as_array(), block_nonce);
699    block_key.encrypt_no_aead(&Nonce::default(), content);
700}
701
702/// Compute nonce for a block at the given locator and with the given plaintext content.
703///
704/// This function is deterministic so for a given block at a given locator it produces the same
705/// nonce. This is not a nonce reuse because the only way two blocks can have the same nonce is if
706/// they have the same content and are at the same locator which means they are in fact the same
707/// block, just referenced from two different branches.
708///
709/// The reason nonces are computed this way instead of randomly is to guarantee two blocks with the
710/// same content at the same locators but in different branches have the same nonce and thus the
711/// same block_id even if they were created independently (as opposed to linking an existing block
712/// from another branch). This in turn guarantees that two branches with identical content have the
713/// same hash.
714///
715/// Note: `read_key` is used as an additional secret hashing material to prevent known plaintext
716/// attacks.
717fn make_block_nonce(
718    locator: &Locator,
719    plaintext_content: &[u8],
720    read_key: &cipher::SecretKey,
721) -> BlockNonce {
722    (read_key.as_ref(), locator, plaintext_content)
723        .hash()
724        .into()
725}