ouisync/blob/
mod.rs

1pub(crate) mod lock;
2
3mod block_ids;
4mod id;
5mod position;
6
7#[cfg(test)]
8mod tests;
9
10pub(crate) use self::{block_ids::BlockIds, id::BlobId};
11
12use self::position::Position;
13use crate::{
14    branch::Branch,
15    crypto::{
16        Hashable,
17        cipher::{self, Nonce, SecretKey},
18        sign::{Keypair, PublicKey},
19    },
20    error::{Error, Result},
21    protocol::{
22        BLOCK_SIZE, Block, BlockContent, BlockId, BlockNonce, Locator, RootNode, RootNodeFilter,
23        SingleBlockPresence,
24    },
25    store::{self, Changeset, ReadTransaction},
26};
27use std::{
28    collections::{HashMap, hash_map::Entry},
29    io::SeekFrom,
30    iter, mem,
31};
32use thiserror::Error;
33
34/// Size of the blob header in bytes.
35// Using u64 instead of usize because HEADER_SIZE must be the same irrespective of whether we're on
36// a 32bit or 64bit processor (if we want two such replicas to be able to sync).
37pub const HEADER_SIZE: usize = mem::size_of::<u64>();
38
39// Max number of blocks processed in a single batch (write transaction). Increasing this number
40// decreases the number of flushes needed during writes but increases the coplexity of the
41// individual flushes.
42pub const BATCH_SIZE: usize = 2048; // 64 MiB
43
44#[derive(Debug, Error)]
45pub(crate) enum ReadWriteError {
46    #[error("block not found in the cache")]
47    CacheMiss,
48    #[error("cache is full")]
49    CacheFull,
50}
51
52pub(crate) struct Blob {
53    branch: Branch,
54    id: BlobId,
55    cache: HashMap<u32, CachedBlock>,
56    len_original: u64,
57    len_modified: u64,
58    position: Position,
59}
60
61impl Blob {
62    /// Opens an existing blob.
63    pub async fn open(tx: &mut ReadTransaction, branch: Branch, id: BlobId) -> Result<Self> {
64        let root_node = tx
65            .load_latest_approved_root_node(branch.id(), RootNodeFilter::Any)
66            .await?;
67        Self::open_at(tx, &root_node, branch, id).await
68    }
69
70    pub async fn open_at(
71        tx: &mut ReadTransaction,
72        root_node: &RootNode,
73        branch: Branch,
74        id: BlobId,
75    ) -> Result<Self> {
76        assert_eq!(root_node.proof.writer_id, *branch.id());
77
78        let (_, buffer) =
79            read_block(tx, root_node, &Locator::head(id), branch.keys().read()).await?;
80
81        let len = buffer.read_u64(0);
82        let cached_block = CachedBlock::from(buffer);
83        let cache = iter::once((0, cached_block)).collect();
84        let position = Position::ZERO;
85
86        Ok(Self {
87            branch,
88            id,
89            cache,
90            len_original: len,
91            len_modified: len,
92            position,
93        })
94    }
95
96    /// Creates a new blob.
97    pub fn create(branch: Branch, id: BlobId) -> Self {
98        let cached_block = CachedBlock::new().with_dirty(true);
99        let cache = iter::once((0, cached_block)).collect();
100
101        Self {
102            branch,
103            id,
104            cache,
105            len_original: 0,
106            len_modified: 0,
107            position: Position::ZERO,
108        }
109    }
110
111    pub fn branch(&self) -> &Branch {
112        &self.branch
113    }
114
115    /// Id of this blob.
116    pub fn id(&self) -> &BlobId {
117        &self.id
118    }
119
120    /// Length of this blob in bytes.
121    pub fn len(&self) -> u64 {
122        self.len_modified
123    }
124
125    // Returns the current seek position from the start of the blob.
126    pub fn seek_position(&self) -> u64 {
127        self.position.get()
128    }
129
130    pub fn block_count(&self) -> u32 {
131        block_count(self.len())
132    }
133
134    /// Was this blob modified and not flushed yet?
135    pub fn is_dirty(&self) -> bool {
136        self.cache.values().any(|block| block.dirty) || self.len_modified != self.len_original
137    }
138
139    /// Seek to an offset in the blob.
140    ///
141    /// It is allowed to specify offset that is outside of the range of the blob but such offset
142    /// will be clamped to be within the range.
143    ///
144    /// Returns the new seek position from the start of the blob.
145    pub fn seek(&mut self, pos: SeekFrom) -> u64 {
146        let position = match pos {
147            SeekFrom::Start(n) => n.min(self.len()),
148            SeekFrom::End(n) => {
149                if n >= 0 {
150                    self.len()
151                } else {
152                    self.len().saturating_sub((-n) as u64)
153                }
154            }
155            SeekFrom::Current(n) => {
156                if n >= 0 {
157                    self.seek_position()
158                        .saturating_add(n as u64)
159                        .min(self.len())
160                } else {
161                    self.seek_position().saturating_sub((-n) as u64)
162                }
163            }
164        };
165
166        self.position.set(position);
167
168        position
169    }
170
171    /// Reads data from this blob into `buffer`, advancing the internal cursor. Returns the
172    /// number of bytes actually read which might be less than `buffer.len()`.
173    pub fn read(&mut self, buffer: &mut [u8]) -> Result<usize, ReadWriteError> {
174        if self.position.get() >= self.len() {
175            return Ok(0);
176        }
177
178        let block = match self.cache.get(&self.position.block) {
179            Some(block) => block,
180            None => {
181                if self.check_cache_capacity() {
182                    return Err(ReadWriteError::CacheMiss);
183                } else {
184                    return Err(ReadWriteError::CacheFull);
185                }
186            }
187        };
188
189        // minimum of:
190        // - buffer length
191        // - remaining size of the current block
192        // - remaining size of the whole blob
193        let read_len = buffer
194            .len()
195            .min(block.content.len() - self.position.offset)
196            .min(self.len() as usize - self.position.get() as usize);
197
198        block
199            .content
200            .read(self.position.offset, &mut buffer[..read_len]);
201
202        self.position.advance(read_len);
203
204        Ok(read_len)
205    }
206
207    #[cfg(test)]
208    pub async fn read_all(&mut self, tx: &mut ReadTransaction, buffer: &mut [u8]) -> Result<usize> {
209        let root_node = tx
210            .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
211            .await?;
212        self.read_all_at(tx, &root_node, buffer).await
213    }
214
215    pub async fn read_all_at(
216        &mut self,
217        tx: &mut ReadTransaction,
218        root_node: &RootNode,
219        buffer: &mut [u8],
220    ) -> Result<usize> {
221        assert_eq!(root_node.proof.writer_id, *self.branch.id());
222
223        let mut offset = 0;
224
225        loop {
226            match self.read(&mut buffer[offset..]) {
227                Ok(0) => break,
228                Ok(len) => {
229                    offset += len;
230                }
231                Err(ReadWriteError::CacheMiss) => self.warmup_at(tx, root_node).await?,
232                Err(ReadWriteError::CacheFull) => {
233                    tracing::error!("cache full");
234                    return Err(Error::OperationNotSupported);
235                }
236            }
237        }
238
239        Ok(offset)
240    }
241
242    /// Read all data from this blob from the current seek position until the end and return then
243    /// in a `Vec`.
244    #[cfg(test)]
245    pub async fn read_to_end(&mut self, tx: &mut ReadTransaction) -> Result<Vec<u8>> {
246        let root_node = tx
247            .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
248            .await?;
249        self.read_to_end_at(tx, &root_node).await
250    }
251
252    /// Read all data from this blob at the given snapshot from the current seek position until the
253    /// end and return then in a `Vec`.
254    pub async fn read_to_end_at(
255        &mut self,
256        tx: &mut ReadTransaction,
257        root_node: &RootNode,
258    ) -> Result<Vec<u8>> {
259        // Allocate the buffer `CHUNK_SIZE` bytes at a time to prevent blowing up the memory in
260        // case the blob header was tampered with.
261        const CHUNK_SIZE: usize = 1024 * 1024;
262
263        let total = (self.len() - self.seek_position())
264            .try_into()
265            .unwrap_or(usize::MAX);
266
267        let mut buffer = Vec::new();
268        let mut offset = 0;
269
270        while offset < total {
271            buffer.resize(buffer.len() + CHUNK_SIZE.min(total - offset), 0);
272            offset += self
273                .read_all_at(tx, root_node, &mut buffer[offset..])
274                .await?;
275        }
276
277        Ok(buffer)
278    }
279
280    pub fn write(&mut self, buffer: &[u8]) -> Result<usize, ReadWriteError> {
281        if buffer.is_empty() {
282            return Ok(0);
283        }
284
285        let block = match self.cache.get_mut(&self.position.block) {
286            Some(block) => block,
287            None => {
288                if !self.check_cache_capacity() {
289                    return Err(ReadWriteError::CacheFull);
290                }
291
292                if self.position.get() >= self.len_modified
293                    || self.position.offset == 0 && buffer.len() >= BLOCK_SIZE
294                {
295                    self.cache.entry(self.position.block).or_default()
296                } else {
297                    return Err(ReadWriteError::CacheMiss);
298                }
299            }
300        };
301
302        let write_len = buffer.len().min(block.content.len() - self.position.offset);
303
304        block
305            .content
306            .write(self.position.offset, &buffer[..write_len]);
307        block.dirty = true;
308
309        self.position.advance(write_len);
310        self.len_modified = self.len_modified.max(self.position.get());
311
312        Ok(write_len)
313    }
314
315    pub async fn write_all(
316        &mut self,
317        tx: &mut ReadTransaction,
318        changeset: &mut Changeset,
319        buffer: &[u8],
320    ) -> Result<()> {
321        let mut offset = 0;
322
323        loop {
324            match self.write(&buffer[offset..]) {
325                Ok(0) => break,
326                Ok(len) => {
327                    offset += len;
328                }
329                Err(ReadWriteError::CacheMiss) => {
330                    self.warmup(tx).await?;
331                }
332                Err(ReadWriteError::CacheFull) => {
333                    self.flush(tx, changeset).await?;
334                }
335            }
336        }
337
338        Ok(())
339    }
340
341    /// Load the current block into the cache.
342    pub async fn warmup(&mut self, tx: &mut ReadTransaction) -> Result<()> {
343        let root_node = tx
344            .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
345            .await?;
346        self.warmup_at(tx, &root_node).await?;
347
348        Ok(())
349    }
350
351    /// Load the current block at the given snapshot into the cache.
352    pub async fn warmup_at(
353        &mut self,
354        tx: &mut ReadTransaction,
355        root_node: &RootNode,
356    ) -> Result<()> {
357        match self.cache.entry(self.position.block) {
358            Entry::Occupied(_) => (),
359            Entry::Vacant(entry) => {
360                let locator = Locator::head(self.id).nth(self.position.block);
361                let (_, buffer) =
362                    read_block(tx, root_node, &locator, self.branch.keys().read()).await?;
363                entry.insert(CachedBlock::from(buffer));
364            }
365        }
366
367        Ok(())
368    }
369
370    /// Truncate the blob to the given length.
371    pub fn truncate(&mut self, len: u64) -> Result<()> {
372        if len == self.len() {
373            return Ok(());
374        }
375
376        if len > self.len() {
377            // TODO: consider supporting this
378            return Err(Error::OperationNotSupported);
379        }
380
381        if self.seek_position() > len {
382            self.seek(SeekFrom::Start(len));
383        }
384
385        self.len_modified = len;
386
387        Ok(())
388    }
389
390    /// Flushes this blob, ensuring that all intermediately buffered contents gets written to the
391    /// store.
392    pub(crate) async fn flush(
393        &mut self,
394        tx: &mut ReadTransaction,
395        changeset: &mut Changeset,
396    ) -> Result<()> {
397        self.write_len(tx, changeset).await?;
398        self.write_blocks(changeset);
399
400        Ok(())
401    }
402
403    /// Remove this blob from the store.
404    pub(crate) fn remove(self, changeset: &mut Changeset) {
405        let locators = Locator::head(self.id)
406            .sequence()
407            .take(self.block_count() as usize);
408
409        for locator in locators {
410            let encoded = locator.encode(self.branch().keys().read());
411            changeset.unlink_block(encoded, None);
412        }
413    }
414
415    fn check_cache_capacity(&mut self) -> bool {
416        if self.cache.len() < BATCH_SIZE {
417            return true;
418        }
419
420        let number = self
421            .cache
422            .iter()
423            .find(|(_, block)| !block.dirty)
424            .map(|(number, _)| *number);
425
426        if let Some(number) = number {
427            self.cache.remove(&number);
428            true
429        } else {
430            false
431        }
432    }
433
434    // Write length, if changed
435    async fn write_len(
436        &mut self,
437        tx: &mut ReadTransaction,
438        changeset: &mut Changeset,
439    ) -> Result<()> {
440        if self.len_modified == self.len_original {
441            return Ok(());
442        }
443
444        if let Some(block) = self.cache.get_mut(&0) {
445            block.content.write_u64(0, self.len_modified);
446            block.dirty = true;
447        } else {
448            let locator = Locator::head(self.id);
449            let root_node = tx
450                .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
451                .await?;
452            let (_, mut content) =
453                read_block(tx, &root_node, &locator, self.branch.keys().read()).await?;
454            content.write_u64(0, self.len_modified);
455            write_block(changeset, &locator, content, self.branch.keys().read());
456        }
457
458        self.len_original = self.len_modified;
459
460        Ok(())
461    }
462
463    fn write_blocks(&mut self, changeset: &mut Changeset) {
464        // Poor man's `drain_filter`.
465        let cache = mem::take(&mut self.cache);
466        let (dirty, clean): (HashMap<_, _>, _) =
467            cache.into_iter().partition(|(_, block)| block.dirty);
468        self.cache = clean;
469
470        for (number, block) in dirty {
471            let locator = Locator::head(self.id).nth(number);
472            write_block(
473                changeset,
474                &locator,
475                block.content,
476                self.branch.keys().read(),
477            );
478        }
479    }
480}
481
482// NOTE: Clone only creates a new instance of the same blob. It doesn't preserve dirtiness.
483impl Clone for Blob {
484    fn clone(&self) -> Self {
485        Self {
486            branch: self.branch.clone(),
487            id: self.id,
488            cache: HashMap::default(),
489            len_original: self.len_original,
490            len_modified: self.len_original,
491            position: self.position,
492        }
493    }
494}
495
496#[derive(Default)]
497struct CachedBlock {
498    content: BlockContent,
499    dirty: bool,
500}
501
502impl CachedBlock {
503    fn new() -> Self {
504        Self::default()
505    }
506
507    fn with_dirty(self, dirty: bool) -> Self {
508        Self { dirty, ..self }
509    }
510}
511
512impl From<BlockContent> for CachedBlock {
513    fn from(content: BlockContent) -> Self {
514        Self {
515            content,
516            dirty: false,
517        }
518    }
519}
520
521/// Creates a shallow copy (only the index nodes are copied, not blocks) of the specified blob into
522/// the specified destination branch.
523///
524/// NOTE: This function is not atomic. However, it is idempotent, so in case it's interrupted, it
525/// can be safely retried.
526pub(crate) async fn fork(blob_id: BlobId, src_branch: &Branch, dst_branch: &Branch) -> Result<()> {
527    // If the blob is already forked, do nothing but still return Ok to maintain idempotency.
528    if src_branch.id() == dst_branch.id() {
529        return Ok(());
530    }
531
532    let read_key = src_branch.keys().read();
533    // Take the write key from the dst branch, not the src branch, to protect us against
534    // accidentally forking into remote branch (remote branches don't have write access).
535    let write_keys = dst_branch.keys().write().ok_or(Error::PermissionDenied)?;
536
537    // FIXME: The src blob can change in the middle of the fork which could cause the dst blob to
538    // become corrupted (part of it will be forked pre-change and part post-change). To prevent
539    // that, we should restart the fork every time the src branch changes, or - better - run the
540    // whole fork in a single transaction (but somehow avoid blocking other tasks).
541
542    let end = {
543        let mut tx = src_branch.store().begin_read().await?;
544        let root_node = tx
545            .load_latest_approved_root_node(src_branch.id(), RootNodeFilter::Any)
546            .await?;
547        load_block_count_hint(&mut tx, &root_node, blob_id, src_branch.keys().read()).await?
548    };
549
550    struct Batch {
551        tx: crate::store::WriteTransaction,
552        changeset: Changeset,
553    }
554
555    impl Batch {
556        async fn apply(self, dst_branch_id: &PublicKey, write_keys: &Keypair) -> Result<()> {
557            let Batch { mut tx, changeset } = self;
558            changeset.apply(&mut tx, dst_branch_id, write_keys).await?;
559            tx.commit().await?;
560            Ok(())
561        }
562    }
563
564    let mut batch = None;
565    let locators = Locator::head(blob_id).sequence().take(end as usize);
566
567    for locator in locators {
568        let Batch { tx, changeset } = if let Some(batch) = &mut batch {
569            batch
570        } else {
571            batch.insert(Batch {
572                tx: src_branch.store().begin_write().await?,
573                changeset: Changeset::new(),
574            })
575        };
576
577        let encoded_locator = locator.encode(read_key);
578
579        let (block_id, block_presence) =
580            match tx.find_block(src_branch.id(), &encoded_locator).await {
581                Ok(id) => id,
582                Err(store::Error::LocatorNotFound) => {
583                    // end of the blob
584                    break;
585                }
586                Err(error) => return Err(error.into()),
587            };
588
589        changeset.link_block(encoded_locator, block_id, block_presence);
590
591        // The `+ 1` is there to not hit on the first run.
592        if ((locator.number() + 1) as usize).is_multiple_of(BATCH_SIZE)
593            && let Some(batch) = batch.take()
594        {
595            batch.apply(dst_branch.id(), write_keys).await?;
596        }
597
598        tracing::trace!(
599            num = locator.number(),
600            block_id = ?block_id,
601            ?block_presence,
602            "fork block",
603        );
604    }
605
606    if let Some(batch) = batch {
607        batch.apply(dst_branch.id(), write_keys).await?;
608    }
609
610    Ok(())
611}
612
613fn block_count(len: u64) -> u32 {
614    // https://stackoverflow.com/questions/2745074/fast-ceiling-of-an-integer-division-in-c-c
615    (1 + (len + HEADER_SIZE as u64 - 1) / BLOCK_SIZE as u64)
616        .try_into()
617        .unwrap_or(u32::MAX)
618}
619
620async fn read_len(
621    tx: &mut ReadTransaction,
622    root_node: &RootNode,
623    blob_id: BlobId,
624    read_key: &cipher::SecretKey,
625) -> Result<u64> {
626    let (_, buffer) = read_block(tx, root_node, &Locator::head(blob_id), read_key).await?;
627    Ok(buffer.read_u64(0))
628}
629
630// Returns the max number of blocks the specified blob has. This either returns the actual number
631// or `u32::MAX` in case the first blob is not available and so the blob length can't be obtained.
632async fn load_block_count_hint(
633    tx: &mut ReadTransaction,
634    root_node: &RootNode,
635    blob_id: BlobId,
636    read_key: &cipher::SecretKey,
637) -> Result<u32> {
638    match read_len(tx, root_node, blob_id, read_key).await {
639        Ok(len) => Ok(block_count(len)),
640        Err(Error::Store(store::Error::BlockNotFound)) => Ok(u32::MAX),
641        Err(error) => Err(error),
642    }
643}
644
645async fn read_block(
646    tx: &mut ReadTransaction,
647    root_node: &RootNode,
648    locator: &Locator,
649    read_key: &cipher::SecretKey,
650) -> Result<(BlockId, BlockContent)> {
651    let (block_id, _) = tx
652        .find_block_at(root_node, &locator.encode(read_key))
653        .await
654        .inspect_err(|error| {
655            tracing::trace!(?error, root_hash = ?root_node.proof.hash, ?locator);
656        })?;
657
658    let mut content = BlockContent::new();
659    let nonce = tx
660        .read_block(&block_id, &mut content)
661        .await
662        .inspect_err(|error| {
663            tracing::trace!(?error, root_hash = ?root_node.proof.hash, ?locator, ?block_id);
664        })?;
665
666    decrypt_block(read_key, &nonce, &mut content);
667
668    Ok((block_id, content))
669}
670
671fn write_block(
672    changeset: &mut Changeset,
673    locator: &Locator,
674    mut content: BlockContent,
675    read_key: &cipher::SecretKey,
676) -> BlockId {
677    let nonce = make_block_nonce(locator, &content, read_key);
678    encrypt_block(read_key, &nonce, &mut content);
679
680    let block = Block::new(content, nonce);
681    let block_id = block.id;
682
683    changeset.link_block(
684        locator.encode(read_key),
685        block.id,
686        SingleBlockPresence::Present,
687    );
688    changeset.write_block(block);
689
690    tracing::trace!(?locator, ?block_id, "write block");
691
692    block_id
693}
694
695fn decrypt_block(blob_key: &cipher::SecretKey, block_nonce: &BlockNonce, content: &mut [u8]) {
696    let block_key = SecretKey::derive_from_key(blob_key.as_array(), block_nonce);
697    block_key.decrypt_no_aead(&Nonce::default(), content);
698}
699
700fn encrypt_block(blob_key: &cipher::SecretKey, block_nonce: &BlockNonce, content: &mut [u8]) {
701    let block_key = SecretKey::derive_from_key(blob_key.as_array(), block_nonce);
702    block_key.encrypt_no_aead(&Nonce::default(), content);
703}
704
705/// Compute nonce for a block at the given locator and with the given plaintext content.
706///
707/// This function is deterministic so for a given block at a given locator it produces the same
708/// nonce. This is not a nonce reuse because the only way two blocks can have the same nonce is if
709/// they have the same content and are at the same locator which means they are in fact the same
710/// block, just referenced from two different branches.
711///
712/// The reason nonces are computed this way instead of randomly is to guarantee two blocks with the
713/// same content at the same locators but in different branches have the same nonce and thus the
714/// same block_id even if they were created independently (as opposed to linking an existing block
715/// from another branch). This in turn guarantees that two branches with identical content have the
716/// same hash.
717///
718/// Note: `read_key` is used as an additional secret hashing material to prevent known plaintext
719/// attacks.
720fn make_block_nonce(
721    locator: &Locator,
722    plaintext_content: &[u8],
723    read_key: &cipher::SecretKey,
724) -> BlockNonce {
725    (read_key.as_ref(), locator, plaintext_content)
726        .hash()
727        .into()
728}