1pub(crate) mod lock;
2
3mod block_ids;
4mod id;
5mod position;
6
7#[cfg(test)]
8mod tests;
9
10pub(crate) use self::{block_ids::BlockIds, id::BlobId};
11
12use self::position::Position;
13use crate::{
14 branch::Branch,
15 collections::HashMap,
16 crypto::{
17 cipher::{self, Nonce, SecretKey},
18 sign::{Keypair, PublicKey},
19 Hashable,
20 },
21 error::{Error, Result},
22 protocol::{
23 Block, BlockContent, BlockId, BlockNonce, Locator, RootNode, RootNodeFilter,
24 SingleBlockPresence, BLOCK_SIZE,
25 },
26 store::{self, Changeset, ReadTransaction},
27};
28use std::{collections::hash_map::Entry, io::SeekFrom, iter, mem};
29use thiserror::Error;
30
31pub const HEADER_SIZE: usize = mem::size_of::<u64>();
35
36pub const BATCH_SIZE: usize = 2048; #[derive(Debug, Error)]
42pub(crate) enum ReadWriteError {
43 #[error("block not found in the cache")]
44 CacheMiss,
45 #[error("cache is full")]
46 CacheFull,
47}
48
49pub(crate) struct Blob {
50 branch: Branch,
51 id: BlobId,
52 cache: HashMap<u32, CachedBlock>,
53 len_original: u64,
54 len_modified: u64,
55 position: Position,
56}
57
58impl Blob {
59 pub async fn open(tx: &mut ReadTransaction, branch: Branch, id: BlobId) -> Result<Self> {
61 let root_node = tx
62 .load_latest_approved_root_node(branch.id(), RootNodeFilter::Any)
63 .await?;
64 Self::open_at(tx, &root_node, branch, id).await
65 }
66
67 pub async fn open_at(
68 tx: &mut ReadTransaction,
69 root_node: &RootNode,
70 branch: Branch,
71 id: BlobId,
72 ) -> Result<Self> {
73 assert_eq!(root_node.proof.writer_id, *branch.id());
74
75 let (_, buffer) =
76 read_block(tx, root_node, &Locator::head(id), branch.keys().read()).await?;
77
78 let len = buffer.read_u64(0);
79 let cached_block = CachedBlock::from(buffer);
80 let cache = iter::once((0, cached_block)).collect();
81 let position = Position::ZERO;
82
83 Ok(Self {
84 branch,
85 id,
86 cache,
87 len_original: len,
88 len_modified: len,
89 position,
90 })
91 }
92
93 pub fn create(branch: Branch, id: BlobId) -> Self {
95 let cached_block = CachedBlock::new().with_dirty(true);
96 let cache = iter::once((0, cached_block)).collect();
97
98 Self {
99 branch,
100 id,
101 cache,
102 len_original: 0,
103 len_modified: 0,
104 position: Position::ZERO,
105 }
106 }
107
108 pub fn branch(&self) -> &Branch {
109 &self.branch
110 }
111
112 pub fn id(&self) -> &BlobId {
114 &self.id
115 }
116
117 pub fn len(&self) -> u64 {
119 self.len_modified
120 }
121
122 pub fn seek_position(&self) -> u64 {
124 self.position.get()
125 }
126
127 pub fn block_count(&self) -> u32 {
128 block_count(self.len())
129 }
130
131 pub fn is_dirty(&self) -> bool {
133 self.cache.values().any(|block| block.dirty) || self.len_modified != self.len_original
134 }
135
136 pub fn seek(&mut self, pos: SeekFrom) -> u64 {
143 let position = match pos {
144 SeekFrom::Start(n) => n.min(self.len()),
145 SeekFrom::End(n) => {
146 if n >= 0 {
147 self.len()
148 } else {
149 self.len().saturating_sub((-n) as u64)
150 }
151 }
152 SeekFrom::Current(n) => {
153 if n >= 0 {
154 self.seek_position()
155 .saturating_add(n as u64)
156 .min(self.len())
157 } else {
158 self.seek_position().saturating_sub((-n) as u64)
159 }
160 }
161 };
162
163 self.position.set(position);
164
165 position
166 }
167
168 pub fn read(&mut self, buffer: &mut [u8]) -> Result<usize, ReadWriteError> {
171 if self.position.get() >= self.len() {
172 return Ok(0);
173 }
174
175 let block = match self.cache.get(&self.position.block) {
176 Some(block) => block,
177 None => {
178 if self.check_cache_capacity() {
179 return Err(ReadWriteError::CacheMiss);
180 } else {
181 return Err(ReadWriteError::CacheFull);
182 }
183 }
184 };
185
186 let read_len = buffer
191 .len()
192 .min(block.content.len() - self.position.offset)
193 .min(self.len() as usize - self.position.get() as usize);
194
195 block
196 .content
197 .read(self.position.offset, &mut buffer[..read_len]);
198
199 self.position.advance(read_len);
200
201 Ok(read_len)
202 }
203
204 #[cfg(test)]
205 pub async fn read_all(&mut self, tx: &mut ReadTransaction, buffer: &mut [u8]) -> Result<usize> {
206 let root_node = tx
207 .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
208 .await?;
209 self.read_all_at(tx, &root_node, buffer).await
210 }
211
212 pub async fn read_all_at(
213 &mut self,
214 tx: &mut ReadTransaction,
215 root_node: &RootNode,
216 buffer: &mut [u8],
217 ) -> Result<usize> {
218 assert_eq!(root_node.proof.writer_id, *self.branch.id());
219
220 let mut offset = 0;
221
222 loop {
223 match self.read(&mut buffer[offset..]) {
224 Ok(0) => break,
225 Ok(len) => {
226 offset += len;
227 }
228 Err(ReadWriteError::CacheMiss) => self.warmup_at(tx, root_node).await?,
229 Err(ReadWriteError::CacheFull) => {
230 tracing::error!("cache full");
231 return Err(Error::OperationNotSupported);
232 }
233 }
234 }
235
236 Ok(offset)
237 }
238
239 #[cfg(test)]
242 pub async fn read_to_end(&mut self, tx: &mut ReadTransaction) -> Result<Vec<u8>> {
243 let root_node = tx
244 .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
245 .await?;
246 self.read_to_end_at(tx, &root_node).await
247 }
248
249 pub async fn read_to_end_at(
252 &mut self,
253 tx: &mut ReadTransaction,
254 root_node: &RootNode,
255 ) -> Result<Vec<u8>> {
256 const CHUNK_SIZE: usize = 1024 * 1024;
259
260 let total = (self.len() - self.seek_position())
261 .try_into()
262 .unwrap_or(usize::MAX);
263
264 let mut buffer = Vec::new();
265 let mut offset = 0;
266
267 while offset < total {
268 buffer.resize(buffer.len() + CHUNK_SIZE.min(total - offset), 0);
269 offset += self
270 .read_all_at(tx, root_node, &mut buffer[offset..])
271 .await?;
272 }
273
274 Ok(buffer)
275 }
276
277 pub fn write(&mut self, buffer: &[u8]) -> Result<usize, ReadWriteError> {
278 if buffer.is_empty() {
279 return Ok(0);
280 }
281
282 let block = match self.cache.get_mut(&self.position.block) {
283 Some(block) => block,
284 None => {
285 if !self.check_cache_capacity() {
286 return Err(ReadWriteError::CacheFull);
287 }
288
289 if self.position.get() >= self.len_modified
290 || self.position.offset == 0 && buffer.len() >= BLOCK_SIZE
291 {
292 self.cache.entry(self.position.block).or_default()
293 } else {
294 return Err(ReadWriteError::CacheMiss);
295 }
296 }
297 };
298
299 let write_len = buffer.len().min(block.content.len() - self.position.offset);
300
301 block
302 .content
303 .write(self.position.offset, &buffer[..write_len]);
304 block.dirty = true;
305
306 self.position.advance(write_len);
307 self.len_modified = self.len_modified.max(self.position.get());
308
309 Ok(write_len)
310 }
311
312 pub async fn write_all(
313 &mut self,
314 tx: &mut ReadTransaction,
315 changeset: &mut Changeset,
316 buffer: &[u8],
317 ) -> Result<()> {
318 let mut offset = 0;
319
320 loop {
321 match self.write(&buffer[offset..]) {
322 Ok(0) => break,
323 Ok(len) => {
324 offset += len;
325 }
326 Err(ReadWriteError::CacheMiss) => {
327 self.warmup(tx).await?;
328 }
329 Err(ReadWriteError::CacheFull) => {
330 self.flush(tx, changeset).await?;
331 }
332 }
333 }
334
335 Ok(())
336 }
337
338 pub async fn warmup(&mut self, tx: &mut ReadTransaction) -> Result<()> {
340 let root_node = tx
341 .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
342 .await?;
343 self.warmup_at(tx, &root_node).await?;
344
345 Ok(())
346 }
347
348 pub async fn warmup_at(
350 &mut self,
351 tx: &mut ReadTransaction,
352 root_node: &RootNode,
353 ) -> Result<()> {
354 match self.cache.entry(self.position.block) {
355 Entry::Occupied(_) => (),
356 Entry::Vacant(entry) => {
357 let locator = Locator::head(self.id).nth(self.position.block);
358 let (_, buffer) =
359 read_block(tx, root_node, &locator, self.branch.keys().read()).await?;
360 entry.insert(CachedBlock::from(buffer));
361 }
362 }
363
364 Ok(())
365 }
366
367 pub fn truncate(&mut self, len: u64) -> Result<()> {
369 if len == self.len() {
370 return Ok(());
371 }
372
373 if len > self.len() {
374 return Err(Error::OperationNotSupported);
376 }
377
378 if self.seek_position() > len {
379 self.seek(SeekFrom::Start(len));
380 }
381
382 self.len_modified = len;
383
384 Ok(())
385 }
386
387 pub(crate) async fn flush(
390 &mut self,
391 tx: &mut ReadTransaction,
392 changeset: &mut Changeset,
393 ) -> Result<()> {
394 self.write_len(tx, changeset).await?;
395 self.write_blocks(changeset);
396
397 Ok(())
398 }
399
400 pub(crate) fn remove(self, changeset: &mut Changeset) {
402 let locators = Locator::head(self.id)
403 .sequence()
404 .take(self.block_count() as usize);
405
406 for locator in locators {
407 let encoded = locator.encode(self.branch().keys().read());
408 changeset.unlink_block(encoded, None);
409 }
410 }
411
412 fn check_cache_capacity(&mut self) -> bool {
413 if self.cache.len() < BATCH_SIZE {
414 return true;
415 }
416
417 let number = self
418 .cache
419 .iter()
420 .find(|(_, block)| !block.dirty)
421 .map(|(number, _)| *number);
422
423 if let Some(number) = number {
424 self.cache.remove(&number);
425 true
426 } else {
427 false
428 }
429 }
430
431 async fn write_len(
433 &mut self,
434 tx: &mut ReadTransaction,
435 changeset: &mut Changeset,
436 ) -> Result<()> {
437 if self.len_modified == self.len_original {
438 return Ok(());
439 }
440
441 if let Some(block) = self.cache.get_mut(&0) {
442 block.content.write_u64(0, self.len_modified);
443 block.dirty = true;
444 } else {
445 let locator = Locator::head(self.id);
446 let root_node = tx
447 .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
448 .await?;
449 let (_, mut content) =
450 read_block(tx, &root_node, &locator, self.branch.keys().read()).await?;
451 content.write_u64(0, self.len_modified);
452 write_block(changeset, &locator, content, self.branch.keys().read());
453 }
454
455 self.len_original = self.len_modified;
456
457 Ok(())
458 }
459
460 fn write_blocks(&mut self, changeset: &mut Changeset) {
461 let cache = mem::take(&mut self.cache);
463 let (dirty, clean): (HashMap<_, _>, _) =
464 cache.into_iter().partition(|(_, block)| block.dirty);
465 self.cache = clean;
466
467 for (number, block) in dirty {
468 let locator = Locator::head(self.id).nth(number);
469 write_block(
470 changeset,
471 &locator,
472 block.content,
473 self.branch.keys().read(),
474 );
475 }
476 }
477}
478
479impl Clone for Blob {
481 fn clone(&self) -> Self {
482 Self {
483 branch: self.branch.clone(),
484 id: self.id,
485 cache: HashMap::default(),
486 len_original: self.len_original,
487 len_modified: self.len_original,
488 position: self.position,
489 }
490 }
491}
492
493#[derive(Default)]
494struct CachedBlock {
495 content: BlockContent,
496 dirty: bool,
497}
498
499impl CachedBlock {
500 fn new() -> Self {
501 Self::default()
502 }
503
504 fn with_dirty(self, dirty: bool) -> Self {
505 Self { dirty, ..self }
506 }
507}
508
509impl From<BlockContent> for CachedBlock {
510 fn from(content: BlockContent) -> Self {
511 Self {
512 content,
513 dirty: false,
514 }
515 }
516}
517
518pub(crate) async fn fork(blob_id: BlobId, src_branch: &Branch, dst_branch: &Branch) -> Result<()> {
524 if src_branch.id() == dst_branch.id() {
526 return Ok(());
527 }
528
529 let read_key = src_branch.keys().read();
530 let write_keys = dst_branch.keys().write().ok_or(Error::PermissionDenied)?;
533
534 let end = {
540 let mut tx = src_branch.store().begin_read().await?;
541 let root_node = tx
542 .load_latest_approved_root_node(src_branch.id(), RootNodeFilter::Any)
543 .await?;
544 load_block_count_hint(&mut tx, &root_node, blob_id, src_branch.keys().read()).await?
545 };
546
547 struct Batch {
548 tx: crate::store::WriteTransaction,
549 changeset: Changeset,
550 }
551
552 impl Batch {
553 async fn apply(self, dst_branch_id: &PublicKey, write_keys: &Keypair) -> Result<()> {
554 let Batch { mut tx, changeset } = self;
555 changeset.apply(&mut tx, dst_branch_id, write_keys).await?;
556 tx.commit().await?;
557 Ok(())
558 }
559 }
560
561 let mut batch = None;
562 let locators = Locator::head(blob_id).sequence().take(end as usize);
563
564 for locator in locators {
565 let Batch { tx, changeset } = if let Some(batch) = &mut batch {
566 batch
567 } else {
568 batch.insert(Batch {
569 tx: src_branch.store().begin_write().await?,
570 changeset: Changeset::new(),
571 })
572 };
573
574 let encoded_locator = locator.encode(read_key);
575
576 let (block_id, block_presence) =
577 match tx.find_block(src_branch.id(), &encoded_locator).await {
578 Ok(id) => id,
579 Err(store::Error::LocatorNotFound) => {
580 break;
582 }
583 Err(error) => return Err(error.into()),
584 };
585
586 changeset.link_block(encoded_locator, block_id, block_presence);
587
588 if (locator.number() + 1) as usize % BATCH_SIZE == 0 {
590 if let Some(batch) = batch.take() {
591 batch.apply(dst_branch.id(), write_keys).await?;
592 }
593 }
594
595 tracing::trace!(
596 num = locator.number(),
597 block_id = ?block_id,
598 ?block_presence,
599 "fork block",
600 );
601 }
602
603 if let Some(batch) = batch {
604 batch.apply(dst_branch.id(), write_keys).await?;
605 }
606
607 Ok(())
608}
609
610fn block_count(len: u64) -> u32 {
611 (1 + (len + HEADER_SIZE as u64 - 1) / BLOCK_SIZE as u64)
613 .try_into()
614 .unwrap_or(u32::MAX)
615}
616
617async fn read_len(
618 tx: &mut ReadTransaction,
619 root_node: &RootNode,
620 blob_id: BlobId,
621 read_key: &cipher::SecretKey,
622) -> Result<u64> {
623 let (_, buffer) = read_block(tx, root_node, &Locator::head(blob_id), read_key).await?;
624 Ok(buffer.read_u64(0))
625}
626
627async fn load_block_count_hint(
630 tx: &mut ReadTransaction,
631 root_node: &RootNode,
632 blob_id: BlobId,
633 read_key: &cipher::SecretKey,
634) -> Result<u32> {
635 match read_len(tx, root_node, blob_id, read_key).await {
636 Ok(len) => Ok(block_count(len)),
637 Err(Error::Store(store::Error::BlockNotFound)) => Ok(u32::MAX),
638 Err(error) => Err(error),
639 }
640}
641
642async fn read_block(
643 tx: &mut ReadTransaction,
644 root_node: &RootNode,
645 locator: &Locator,
646 read_key: &cipher::SecretKey,
647) -> Result<(BlockId, BlockContent)> {
648 let (block_id, _) = tx
649 .find_block_at(root_node, &locator.encode(read_key))
650 .await
651 .inspect_err(|error| {
652 tracing::trace!(?error, root_hash = ?root_node.proof.hash, ?locator);
653 })?;
654
655 let mut content = BlockContent::new();
656 let nonce = tx
657 .read_block(&block_id, &mut content)
658 .await
659 .inspect_err(|error| {
660 tracing::trace!(?error, root_hash = ?root_node.proof.hash, ?locator, ?block_id);
661 })?;
662
663 decrypt_block(read_key, &nonce, &mut content);
664
665 Ok((block_id, content))
666}
667
668fn write_block(
669 changeset: &mut Changeset,
670 locator: &Locator,
671 mut content: BlockContent,
672 read_key: &cipher::SecretKey,
673) -> BlockId {
674 let nonce = make_block_nonce(locator, &content, read_key);
675 encrypt_block(read_key, &nonce, &mut content);
676
677 let block = Block::new(content, nonce);
678 let block_id = block.id;
679
680 changeset.link_block(
681 locator.encode(read_key),
682 block.id,
683 SingleBlockPresence::Present,
684 );
685 changeset.write_block(block);
686
687 tracing::trace!(?locator, ?block_id, "write block");
688
689 block_id
690}
691
692fn decrypt_block(blob_key: &cipher::SecretKey, block_nonce: &BlockNonce, content: &mut [u8]) {
693 let block_key = SecretKey::derive_from_key(blob_key.as_array(), block_nonce);
694 block_key.decrypt_no_aead(&Nonce::default(), content);
695}
696
697fn encrypt_block(blob_key: &cipher::SecretKey, block_nonce: &BlockNonce, content: &mut [u8]) {
698 let block_key = SecretKey::derive_from_key(blob_key.as_array(), block_nonce);
699 block_key.encrypt_no_aead(&Nonce::default(), content);
700}
701
702fn make_block_nonce(
718 locator: &Locator,
719 plaintext_content: &[u8],
720 read_key: &cipher::SecretKey,
721) -> BlockNonce {
722 (read_key.as_ref(), locator, plaintext_content)
723 .hash()
724 .into()
725}