1pub(crate) mod lock;
2
3mod block_ids;
4mod id;
5mod position;
6
7#[cfg(test)]
8mod tests;
9
10pub(crate) use self::{block_ids::BlockIds, id::BlobId};
11
12use self::position::Position;
13use crate::{
14 branch::Branch,
15 crypto::{
16 Hashable,
17 cipher::{self, Nonce, SecretKey},
18 sign::{Keypair, PublicKey},
19 },
20 error::{Error, Result},
21 protocol::{
22 BLOCK_SIZE, Block, BlockContent, BlockId, BlockNonce, Locator, RootNode, RootNodeFilter,
23 SingleBlockPresence,
24 },
25 store::{self, Changeset, ReadTransaction},
26};
27use std::{
28 collections::{HashMap, hash_map::Entry},
29 io::SeekFrom,
30 iter, mem,
31};
32use thiserror::Error;
33
34pub const HEADER_SIZE: usize = mem::size_of::<u64>();
38
39pub const BATCH_SIZE: usize = 2048; #[derive(Debug, Error)]
45pub(crate) enum ReadWriteError {
46 #[error("block not found in the cache")]
47 CacheMiss,
48 #[error("cache is full")]
49 CacheFull,
50}
51
52pub(crate) struct Blob {
53 branch: Branch,
54 id: BlobId,
55 cache: HashMap<u32, CachedBlock>,
56 len_original: u64,
57 len_modified: u64,
58 position: Position,
59}
60
61impl Blob {
62 pub async fn open(tx: &mut ReadTransaction, branch: Branch, id: BlobId) -> Result<Self> {
64 let root_node = tx
65 .load_latest_approved_root_node(branch.id(), RootNodeFilter::Any)
66 .await?;
67 Self::open_at(tx, &root_node, branch, id).await
68 }
69
70 pub async fn open_at(
71 tx: &mut ReadTransaction,
72 root_node: &RootNode,
73 branch: Branch,
74 id: BlobId,
75 ) -> Result<Self> {
76 assert_eq!(root_node.proof.writer_id, *branch.id());
77
78 let (_, buffer) =
79 read_block(tx, root_node, &Locator::head(id), branch.keys().read()).await?;
80
81 let len = buffer.read_u64(0);
82 let cached_block = CachedBlock::from(buffer);
83 let cache = iter::once((0, cached_block)).collect();
84 let position = Position::ZERO;
85
86 Ok(Self {
87 branch,
88 id,
89 cache,
90 len_original: len,
91 len_modified: len,
92 position,
93 })
94 }
95
96 pub fn create(branch: Branch, id: BlobId) -> Self {
98 let cached_block = CachedBlock::new().with_dirty(true);
99 let cache = iter::once((0, cached_block)).collect();
100
101 Self {
102 branch,
103 id,
104 cache,
105 len_original: 0,
106 len_modified: 0,
107 position: Position::ZERO,
108 }
109 }
110
111 pub fn branch(&self) -> &Branch {
112 &self.branch
113 }
114
115 pub fn id(&self) -> &BlobId {
117 &self.id
118 }
119
120 pub fn len(&self) -> u64 {
122 self.len_modified
123 }
124
125 pub fn seek_position(&self) -> u64 {
127 self.position.get()
128 }
129
130 pub fn block_count(&self) -> u32 {
131 block_count(self.len())
132 }
133
134 pub fn is_dirty(&self) -> bool {
136 self.cache.values().any(|block| block.dirty) || self.len_modified != self.len_original
137 }
138
139 pub fn seek(&mut self, pos: SeekFrom) -> u64 {
146 let position = match pos {
147 SeekFrom::Start(n) => n.min(self.len()),
148 SeekFrom::End(n) => {
149 if n >= 0 {
150 self.len()
151 } else {
152 self.len().saturating_sub((-n) as u64)
153 }
154 }
155 SeekFrom::Current(n) => {
156 if n >= 0 {
157 self.seek_position()
158 .saturating_add(n as u64)
159 .min(self.len())
160 } else {
161 self.seek_position().saturating_sub((-n) as u64)
162 }
163 }
164 };
165
166 self.position.set(position);
167
168 position
169 }
170
171 pub fn read(&mut self, buffer: &mut [u8]) -> Result<usize, ReadWriteError> {
174 if self.position.get() >= self.len() {
175 return Ok(0);
176 }
177
178 let block = match self.cache.get(&self.position.block) {
179 Some(block) => block,
180 None => {
181 if self.check_cache_capacity() {
182 return Err(ReadWriteError::CacheMiss);
183 } else {
184 return Err(ReadWriteError::CacheFull);
185 }
186 }
187 };
188
189 let read_len = buffer
194 .len()
195 .min(block.content.len() - self.position.offset)
196 .min(self.len() as usize - self.position.get() as usize);
197
198 block
199 .content
200 .read(self.position.offset, &mut buffer[..read_len]);
201
202 self.position.advance(read_len);
203
204 Ok(read_len)
205 }
206
207 #[cfg(test)]
208 pub async fn read_all(&mut self, tx: &mut ReadTransaction, buffer: &mut [u8]) -> Result<usize> {
209 let root_node = tx
210 .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
211 .await?;
212 self.read_all_at(tx, &root_node, buffer).await
213 }
214
215 pub async fn read_all_at(
216 &mut self,
217 tx: &mut ReadTransaction,
218 root_node: &RootNode,
219 buffer: &mut [u8],
220 ) -> Result<usize> {
221 assert_eq!(root_node.proof.writer_id, *self.branch.id());
222
223 let mut offset = 0;
224
225 loop {
226 match self.read(&mut buffer[offset..]) {
227 Ok(0) => break,
228 Ok(len) => {
229 offset += len;
230 }
231 Err(ReadWriteError::CacheMiss) => self.warmup_at(tx, root_node).await?,
232 Err(ReadWriteError::CacheFull) => {
233 tracing::error!("cache full");
234 return Err(Error::OperationNotSupported);
235 }
236 }
237 }
238
239 Ok(offset)
240 }
241
242 #[cfg(test)]
245 pub async fn read_to_end(&mut self, tx: &mut ReadTransaction) -> Result<Vec<u8>> {
246 let root_node = tx
247 .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
248 .await?;
249 self.read_to_end_at(tx, &root_node).await
250 }
251
252 pub async fn read_to_end_at(
255 &mut self,
256 tx: &mut ReadTransaction,
257 root_node: &RootNode,
258 ) -> Result<Vec<u8>> {
259 const CHUNK_SIZE: usize = 1024 * 1024;
262
263 let total = (self.len() - self.seek_position())
264 .try_into()
265 .unwrap_or(usize::MAX);
266
267 let mut buffer = Vec::new();
268 let mut offset = 0;
269
270 while offset < total {
271 buffer.resize(buffer.len() + CHUNK_SIZE.min(total - offset), 0);
272 offset += self
273 .read_all_at(tx, root_node, &mut buffer[offset..])
274 .await?;
275 }
276
277 Ok(buffer)
278 }
279
280 pub fn write(&mut self, buffer: &[u8]) -> Result<usize, ReadWriteError> {
281 if buffer.is_empty() {
282 return Ok(0);
283 }
284
285 let block = match self.cache.get_mut(&self.position.block) {
286 Some(block) => block,
287 None => {
288 if !self.check_cache_capacity() {
289 return Err(ReadWriteError::CacheFull);
290 }
291
292 if self.position.get() >= self.len_modified
293 || self.position.offset == 0 && buffer.len() >= BLOCK_SIZE
294 {
295 self.cache.entry(self.position.block).or_default()
296 } else {
297 return Err(ReadWriteError::CacheMiss);
298 }
299 }
300 };
301
302 let write_len = buffer.len().min(block.content.len() - self.position.offset);
303
304 block
305 .content
306 .write(self.position.offset, &buffer[..write_len]);
307 block.dirty = true;
308
309 self.position.advance(write_len);
310 self.len_modified = self.len_modified.max(self.position.get());
311
312 Ok(write_len)
313 }
314
315 pub async fn write_all(
316 &mut self,
317 tx: &mut ReadTransaction,
318 changeset: &mut Changeset,
319 buffer: &[u8],
320 ) -> Result<()> {
321 let mut offset = 0;
322
323 loop {
324 match self.write(&buffer[offset..]) {
325 Ok(0) => break,
326 Ok(len) => {
327 offset += len;
328 }
329 Err(ReadWriteError::CacheMiss) => {
330 self.warmup(tx).await?;
331 }
332 Err(ReadWriteError::CacheFull) => {
333 self.flush(tx, changeset).await?;
334 }
335 }
336 }
337
338 Ok(())
339 }
340
341 pub async fn warmup(&mut self, tx: &mut ReadTransaction) -> Result<()> {
343 let root_node = tx
344 .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
345 .await?;
346 self.warmup_at(tx, &root_node).await?;
347
348 Ok(())
349 }
350
351 pub async fn warmup_at(
353 &mut self,
354 tx: &mut ReadTransaction,
355 root_node: &RootNode,
356 ) -> Result<()> {
357 match self.cache.entry(self.position.block) {
358 Entry::Occupied(_) => (),
359 Entry::Vacant(entry) => {
360 let locator = Locator::head(self.id).nth(self.position.block);
361 let (_, buffer) =
362 read_block(tx, root_node, &locator, self.branch.keys().read()).await?;
363 entry.insert(CachedBlock::from(buffer));
364 }
365 }
366
367 Ok(())
368 }
369
370 pub fn truncate(&mut self, len: u64) -> Result<()> {
372 if len == self.len() {
373 return Ok(());
374 }
375
376 if len > self.len() {
377 return Err(Error::OperationNotSupported);
379 }
380
381 if self.seek_position() > len {
382 self.seek(SeekFrom::Start(len));
383 }
384
385 self.len_modified = len;
386
387 Ok(())
388 }
389
390 pub(crate) async fn flush(
393 &mut self,
394 tx: &mut ReadTransaction,
395 changeset: &mut Changeset,
396 ) -> Result<()> {
397 self.write_len(tx, changeset).await?;
398 self.write_blocks(changeset);
399
400 Ok(())
401 }
402
403 pub(crate) fn remove(self, changeset: &mut Changeset) {
405 let locators = Locator::head(self.id)
406 .sequence()
407 .take(self.block_count() as usize);
408
409 for locator in locators {
410 let encoded = locator.encode(self.branch().keys().read());
411 changeset.unlink_block(encoded, None);
412 }
413 }
414
415 fn check_cache_capacity(&mut self) -> bool {
416 if self.cache.len() < BATCH_SIZE {
417 return true;
418 }
419
420 let number = self
421 .cache
422 .iter()
423 .find(|(_, block)| !block.dirty)
424 .map(|(number, _)| *number);
425
426 if let Some(number) = number {
427 self.cache.remove(&number);
428 true
429 } else {
430 false
431 }
432 }
433
434 async fn write_len(
436 &mut self,
437 tx: &mut ReadTransaction,
438 changeset: &mut Changeset,
439 ) -> Result<()> {
440 if self.len_modified == self.len_original {
441 return Ok(());
442 }
443
444 if let Some(block) = self.cache.get_mut(&0) {
445 block.content.write_u64(0, self.len_modified);
446 block.dirty = true;
447 } else {
448 let locator = Locator::head(self.id);
449 let root_node = tx
450 .load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
451 .await?;
452 let (_, mut content) =
453 read_block(tx, &root_node, &locator, self.branch.keys().read()).await?;
454 content.write_u64(0, self.len_modified);
455 write_block(changeset, &locator, content, self.branch.keys().read());
456 }
457
458 self.len_original = self.len_modified;
459
460 Ok(())
461 }
462
463 fn write_blocks(&mut self, changeset: &mut Changeset) {
464 let cache = mem::take(&mut self.cache);
466 let (dirty, clean): (HashMap<_, _>, _) =
467 cache.into_iter().partition(|(_, block)| block.dirty);
468 self.cache = clean;
469
470 for (number, block) in dirty {
471 let locator = Locator::head(self.id).nth(number);
472 write_block(
473 changeset,
474 &locator,
475 block.content,
476 self.branch.keys().read(),
477 );
478 }
479 }
480}
481
482impl Clone for Blob {
484 fn clone(&self) -> Self {
485 Self {
486 branch: self.branch.clone(),
487 id: self.id,
488 cache: HashMap::default(),
489 len_original: self.len_original,
490 len_modified: self.len_original,
491 position: self.position,
492 }
493 }
494}
495
496#[derive(Default)]
497struct CachedBlock {
498 content: BlockContent,
499 dirty: bool,
500}
501
502impl CachedBlock {
503 fn new() -> Self {
504 Self::default()
505 }
506
507 fn with_dirty(self, dirty: bool) -> Self {
508 Self { dirty, ..self }
509 }
510}
511
512impl From<BlockContent> for CachedBlock {
513 fn from(content: BlockContent) -> Self {
514 Self {
515 content,
516 dirty: false,
517 }
518 }
519}
520
521pub(crate) async fn fork(blob_id: BlobId, src_branch: &Branch, dst_branch: &Branch) -> Result<()> {
527 if src_branch.id() == dst_branch.id() {
529 return Ok(());
530 }
531
532 let read_key = src_branch.keys().read();
533 let write_keys = dst_branch.keys().write().ok_or(Error::PermissionDenied)?;
536
537 let end = {
543 let mut tx = src_branch.store().begin_read().await?;
544 let root_node = tx
545 .load_latest_approved_root_node(src_branch.id(), RootNodeFilter::Any)
546 .await?;
547 load_block_count_hint(&mut tx, &root_node, blob_id, src_branch.keys().read()).await?
548 };
549
550 struct Batch {
551 tx: crate::store::WriteTransaction,
552 changeset: Changeset,
553 }
554
555 impl Batch {
556 async fn apply(self, dst_branch_id: &PublicKey, write_keys: &Keypair) -> Result<()> {
557 let Batch { mut tx, changeset } = self;
558 changeset.apply(&mut tx, dst_branch_id, write_keys).await?;
559 tx.commit().await?;
560 Ok(())
561 }
562 }
563
564 let mut batch = None;
565 let locators = Locator::head(blob_id).sequence().take(end as usize);
566
567 for locator in locators {
568 let Batch { tx, changeset } = if let Some(batch) = &mut batch {
569 batch
570 } else {
571 batch.insert(Batch {
572 tx: src_branch.store().begin_write().await?,
573 changeset: Changeset::new(),
574 })
575 };
576
577 let encoded_locator = locator.encode(read_key);
578
579 let (block_id, block_presence) =
580 match tx.find_block(src_branch.id(), &encoded_locator).await {
581 Ok(id) => id,
582 Err(store::Error::LocatorNotFound) => {
583 break;
585 }
586 Err(error) => return Err(error.into()),
587 };
588
589 changeset.link_block(encoded_locator, block_id, block_presence);
590
591 if ((locator.number() + 1) as usize).is_multiple_of(BATCH_SIZE)
593 && let Some(batch) = batch.take()
594 {
595 batch.apply(dst_branch.id(), write_keys).await?;
596 }
597
598 tracing::trace!(
599 num = locator.number(),
600 block_id = ?block_id,
601 ?block_presence,
602 "fork block",
603 );
604 }
605
606 if let Some(batch) = batch {
607 batch.apply(dst_branch.id(), write_keys).await?;
608 }
609
610 Ok(())
611}
612
613fn block_count(len: u64) -> u32 {
614 (1 + (len + HEADER_SIZE as u64 - 1) / BLOCK_SIZE as u64)
616 .try_into()
617 .unwrap_or(u32::MAX)
618}
619
620async fn read_len(
621 tx: &mut ReadTransaction,
622 root_node: &RootNode,
623 blob_id: BlobId,
624 read_key: &cipher::SecretKey,
625) -> Result<u64> {
626 let (_, buffer) = read_block(tx, root_node, &Locator::head(blob_id), read_key).await?;
627 Ok(buffer.read_u64(0))
628}
629
630async fn load_block_count_hint(
633 tx: &mut ReadTransaction,
634 root_node: &RootNode,
635 blob_id: BlobId,
636 read_key: &cipher::SecretKey,
637) -> Result<u32> {
638 match read_len(tx, root_node, blob_id, read_key).await {
639 Ok(len) => Ok(block_count(len)),
640 Err(Error::Store(store::Error::BlockNotFound)) => Ok(u32::MAX),
641 Err(error) => Err(error),
642 }
643}
644
645async fn read_block(
646 tx: &mut ReadTransaction,
647 root_node: &RootNode,
648 locator: &Locator,
649 read_key: &cipher::SecretKey,
650) -> Result<(BlockId, BlockContent)> {
651 let (block_id, _) = tx
652 .find_block_at(root_node, &locator.encode(read_key))
653 .await
654 .inspect_err(|error| {
655 tracing::trace!(?error, root_hash = ?root_node.proof.hash, ?locator);
656 })?;
657
658 let mut content = BlockContent::new();
659 let nonce = tx
660 .read_block(&block_id, &mut content)
661 .await
662 .inspect_err(|error| {
663 tracing::trace!(?error, root_hash = ?root_node.proof.hash, ?locator, ?block_id);
664 })?;
665
666 decrypt_block(read_key, &nonce, &mut content);
667
668 Ok((block_id, content))
669}
670
671fn write_block(
672 changeset: &mut Changeset,
673 locator: &Locator,
674 mut content: BlockContent,
675 read_key: &cipher::SecretKey,
676) -> BlockId {
677 let nonce = make_block_nonce(locator, &content, read_key);
678 encrypt_block(read_key, &nonce, &mut content);
679
680 let block = Block::new(content, nonce);
681 let block_id = block.id;
682
683 changeset.link_block(
684 locator.encode(read_key),
685 block.id,
686 SingleBlockPresence::Present,
687 );
688 changeset.write_block(block);
689
690 tracing::trace!(?locator, ?block_id, "write block");
691
692 block_id
693}
694
695fn decrypt_block(blob_key: &cipher::SecretKey, block_nonce: &BlockNonce, content: &mut [u8]) {
696 let block_key = SecretKey::derive_from_key(blob_key.as_array(), block_nonce);
697 block_key.decrypt_no_aead(&Nonce::default(), content);
698}
699
700fn encrypt_block(blob_key: &cipher::SecretKey, block_nonce: &BlockNonce, content: &mut [u8]) {
701 let block_key = SecretKey::derive_from_key(blob_key.as_array(), block_nonce);
702 block_key.encrypt_no_aead(&Nonce::default(), content);
703}
704
705fn make_block_nonce(
721 locator: &Locator,
722 plaintext_content: &[u8],
723 read_key: &cipher::SecretKey,
724) -> BlockNonce {
725 (read_key.as_ref(), locator, plaintext_content)
726 .hash()
727 .into()
728}