pub(crate) mod lock;
mod block_ids;
mod id;
mod position;
#[cfg(test)]
mod tests;
pub(crate) use self::{block_ids::BlockIds, id::BlobId};
use self::position::Position;
use crate::{
branch::Branch,
collections::{hash_map::Entry, HashMap},
crypto::{
cipher::{self, Nonce, SecretKey},
sign::{Keypair, PublicKey},
Hashable,
},
error::{Error, Result},
protocol::{
Block, BlockContent, BlockId, BlockNonce, Locator, RootNode, RootNodeFilter,
SingleBlockPresence, BLOCK_SIZE,
},
store::{self, Changeset, ReadTransaction},
};
use std::{io::SeekFrom, iter, mem};
use thiserror::Error;
pub const HEADER_SIZE: usize = mem::size_of::<u64>();
pub const BATCH_SIZE: usize = 2048; #[derive(Debug, Error)]
pub(crate) enum ReadWriteError {
#[error("block not found in the cache")]
CacheMiss,
#[error("cache is full")]
CacheFull,
}
pub(crate) struct Blob {
branch: Branch,
id: BlobId,
cache: HashMap<u32, CachedBlock>,
len_original: u64,
len_modified: u64,
position: Position,
}
impl Blob {
pub async fn open(tx: &mut ReadTransaction, branch: Branch, id: BlobId) -> Result<Self> {
let root_node = tx
.load_latest_approved_root_node(branch.id(), RootNodeFilter::Any)
.await?;
Self::open_at(tx, &root_node, branch, id).await
}
pub async fn open_at(
tx: &mut ReadTransaction,
root_node: &RootNode,
branch: Branch,
id: BlobId,
) -> Result<Self> {
assert_eq!(root_node.proof.writer_id, *branch.id());
let (_, buffer) =
read_block(tx, root_node, &Locator::head(id), branch.keys().read()).await?;
let len = buffer.read_u64(0);
let cached_block = CachedBlock::from(buffer);
let cache = iter::once((0, cached_block)).collect();
let position = Position::ZERO;
Ok(Self {
branch,
id,
cache,
len_original: len,
len_modified: len,
position,
})
}
pub fn create(branch: Branch, id: BlobId) -> Self {
let cached_block = CachedBlock::new().with_dirty(true);
let cache = iter::once((0, cached_block)).collect();
Self {
branch,
id,
cache,
len_original: 0,
len_modified: 0,
position: Position::ZERO,
}
}
pub fn branch(&self) -> &Branch {
&self.branch
}
pub fn id(&self) -> &BlobId {
&self.id
}
pub fn len(&self) -> u64 {
self.len_modified
}
pub fn seek_position(&self) -> u64 {
self.position.get()
}
pub fn block_count(&self) -> u32 {
block_count(self.len())
}
pub fn is_dirty(&self) -> bool {
self.cache.values().any(|block| block.dirty) || self.len_modified != self.len_original
}
pub fn seek(&mut self, pos: SeekFrom) -> u64 {
let position = match pos {
SeekFrom::Start(n) => n.min(self.len()),
SeekFrom::End(n) => {
if n >= 0 {
self.len()
} else {
self.len().saturating_sub((-n) as u64)
}
}
SeekFrom::Current(n) => {
if n >= 0 {
self.seek_position()
.saturating_add(n as u64)
.min(self.len())
} else {
self.seek_position().saturating_sub((-n) as u64)
}
}
};
self.position.set(position);
position
}
pub fn read(&mut self, buffer: &mut [u8]) -> Result<usize, ReadWriteError> {
if self.position.get() >= self.len() {
return Ok(0);
}
let block = match self.cache.get(&self.position.block) {
Some(block) => block,
None => {
if self.check_cache_capacity() {
return Err(ReadWriteError::CacheMiss);
} else {
return Err(ReadWriteError::CacheFull);
}
}
};
let read_len = buffer
.len()
.min(block.content.len() - self.position.offset)
.min(self.len() as usize - self.position.get() as usize);
block
.content
.read(self.position.offset, &mut buffer[..read_len]);
self.position.advance(read_len);
Ok(read_len)
}
#[cfg(test)]
pub async fn read_all(&mut self, tx: &mut ReadTransaction, buffer: &mut [u8]) -> Result<usize> {
let root_node = tx
.load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
.await?;
self.read_all_at(tx, &root_node, buffer).await
}
pub async fn read_all_at(
&mut self,
tx: &mut ReadTransaction,
root_node: &RootNode,
buffer: &mut [u8],
) -> Result<usize> {
assert_eq!(root_node.proof.writer_id, *self.branch.id());
let mut offset = 0;
loop {
match self.read(&mut buffer[offset..]) {
Ok(0) => break,
Ok(len) => {
offset += len;
}
Err(ReadWriteError::CacheMiss) => self.warmup_at(tx, root_node).await?,
Err(ReadWriteError::CacheFull) => {
tracing::error!("cache full");
return Err(Error::OperationNotSupported);
}
}
}
Ok(offset)
}
#[cfg(test)]
pub async fn read_to_end(&mut self, tx: &mut ReadTransaction) -> Result<Vec<u8>> {
let root_node = tx
.load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
.await?;
self.read_to_end_at(tx, &root_node).await
}
pub async fn read_to_end_at(
&mut self,
tx: &mut ReadTransaction,
root_node: &RootNode,
) -> Result<Vec<u8>> {
const CHUNK_SIZE: usize = 1024 * 1024;
let total = (self.len() - self.seek_position())
.try_into()
.unwrap_or(usize::MAX);
let mut buffer = Vec::new();
let mut offset = 0;
while offset < total {
buffer.resize(buffer.len() + CHUNK_SIZE.min(total - offset), 0);
offset += self
.read_all_at(tx, root_node, &mut buffer[offset..])
.await?;
}
Ok(buffer)
}
pub fn write(&mut self, buffer: &[u8]) -> Result<usize, ReadWriteError> {
if buffer.is_empty() {
return Ok(0);
}
let block = match self.cache.get_mut(&self.position.block) {
Some(block) => block,
None => {
if !self.check_cache_capacity() {
return Err(ReadWriteError::CacheFull);
}
if self.position.get() >= self.len_modified
|| self.position.offset == 0 && buffer.len() >= BLOCK_SIZE
{
self.cache.entry(self.position.block).or_default()
} else {
return Err(ReadWriteError::CacheMiss);
}
}
};
let write_len = buffer.len().min(block.content.len() - self.position.offset);
block
.content
.write(self.position.offset, &buffer[..write_len]);
block.dirty = true;
self.position.advance(write_len);
self.len_modified = self.len_modified.max(self.position.get());
Ok(write_len)
}
pub async fn write_all(
&mut self,
tx: &mut ReadTransaction,
changeset: &mut Changeset,
buffer: &[u8],
) -> Result<()> {
let mut offset = 0;
loop {
match self.write(&buffer[offset..]) {
Ok(0) => break,
Ok(len) => {
offset += len;
}
Err(ReadWriteError::CacheMiss) => {
self.warmup(tx).await?;
}
Err(ReadWriteError::CacheFull) => {
self.flush(tx, changeset).await?;
}
}
}
Ok(())
}
pub async fn warmup(&mut self, tx: &mut ReadTransaction) -> Result<()> {
let root_node = tx
.load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
.await?;
self.warmup_at(tx, &root_node).await?;
Ok(())
}
pub async fn warmup_at(
&mut self,
tx: &mut ReadTransaction,
root_node: &RootNode,
) -> Result<()> {
match self.cache.entry(self.position.block) {
Entry::Occupied(_) => (),
Entry::Vacant(entry) => {
let locator = Locator::head(self.id).nth(self.position.block);
let (_, buffer) =
read_block(tx, root_node, &locator, self.branch.keys().read()).await?;
entry.insert(CachedBlock::from(buffer));
}
}
Ok(())
}
pub fn truncate(&mut self, len: u64) -> Result<()> {
if len == self.len() {
return Ok(());
}
if len > self.len() {
return Err(Error::OperationNotSupported);
}
if self.seek_position() > len {
self.seek(SeekFrom::Start(len));
}
self.len_modified = len;
Ok(())
}
pub(crate) async fn flush(
&mut self,
tx: &mut ReadTransaction,
changeset: &mut Changeset,
) -> Result<()> {
self.write_len(tx, changeset).await?;
self.write_blocks(changeset);
Ok(())
}
pub(crate) fn remove(self, changeset: &mut Changeset) {
let locators = Locator::head(self.id)
.sequence()
.take(self.block_count() as usize);
for locator in locators {
let encoded = locator.encode(self.branch().keys().read());
changeset.unlink_block(encoded, None);
}
}
fn check_cache_capacity(&mut self) -> bool {
if self.cache.len() < BATCH_SIZE {
return true;
}
let number = self
.cache
.iter()
.find(|(_, block)| !block.dirty)
.map(|(number, _)| *number);
if let Some(number) = number {
self.cache.remove(&number);
true
} else {
false
}
}
async fn write_len(
&mut self,
tx: &mut ReadTransaction,
changeset: &mut Changeset,
) -> Result<()> {
if self.len_modified == self.len_original {
return Ok(());
}
if let Some(block) = self.cache.get_mut(&0) {
block.content.write_u64(0, self.len_modified);
block.dirty = true;
} else {
let locator = Locator::head(self.id);
let root_node = tx
.load_latest_approved_root_node(self.branch.id(), RootNodeFilter::Any)
.await?;
let (_, mut content) =
read_block(tx, &root_node, &locator, self.branch.keys().read()).await?;
content.write_u64(0, self.len_modified);
write_block(changeset, &locator, content, self.branch.keys().read());
}
self.len_original = self.len_modified;
Ok(())
}
fn write_blocks(&mut self, changeset: &mut Changeset) {
let cache = mem::take(&mut self.cache);
let (dirty, clean): (HashMap<_, _>, _) =
cache.into_iter().partition(|(_, block)| block.dirty);
self.cache = clean;
for (number, block) in dirty {
let locator = Locator::head(self.id).nth(number);
write_block(
changeset,
&locator,
block.content,
self.branch.keys().read(),
);
}
}
}
impl Clone for Blob {
fn clone(&self) -> Self {
Self {
branch: self.branch.clone(),
id: self.id,
cache: HashMap::default(),
len_original: self.len_original,
len_modified: self.len_original,
position: self.position,
}
}
}
#[derive(Default)]
struct CachedBlock {
content: BlockContent,
dirty: bool,
}
impl CachedBlock {
fn new() -> Self {
Self::default()
}
fn with_dirty(self, dirty: bool) -> Self {
Self { dirty, ..self }
}
}
impl From<BlockContent> for CachedBlock {
fn from(content: BlockContent) -> Self {
Self {
content,
dirty: false,
}
}
}
pub(crate) async fn fork(blob_id: BlobId, src_branch: &Branch, dst_branch: &Branch) -> Result<()> {
if src_branch.id() == dst_branch.id() {
return Ok(());
}
let read_key = src_branch.keys().read();
let write_keys = dst_branch.keys().write().ok_or(Error::PermissionDenied)?;
let end = {
let mut tx = src_branch.store().begin_read().await?;
let root_node = tx
.load_latest_approved_root_node(src_branch.id(), RootNodeFilter::Any)
.await?;
load_block_count_hint(&mut tx, &root_node, blob_id, src_branch.keys().read()).await?
};
struct Batch {
tx: crate::store::WriteTransaction,
changeset: Changeset,
}
impl Batch {
async fn apply(self, dst_branch_id: &PublicKey, write_keys: &Keypair) -> Result<()> {
let Batch { mut tx, changeset } = self;
changeset.apply(&mut tx, dst_branch_id, write_keys).await?;
tx.commit().await?;
Ok(())
}
}
let mut batch = None;
let locators = Locator::head(blob_id).sequence().take(end as usize);
for locator in locators {
let Batch { tx, changeset } = if let Some(batch) = &mut batch {
batch
} else {
batch.insert(Batch {
tx: src_branch.store().begin_write().await?,
changeset: Changeset::new(),
})
};
let encoded_locator = locator.encode(read_key);
let (block_id, block_presence) =
match tx.find_block(src_branch.id(), &encoded_locator).await {
Ok(id) => id,
Err(store::Error::LocatorNotFound) => {
break;
}
Err(error) => return Err(error.into()),
};
changeset.link_block(encoded_locator, block_id, block_presence);
if (locator.number() + 1) as usize % BATCH_SIZE == 0 {
if let Some(batch) = batch.take() {
batch.apply(dst_branch.id(), write_keys).await?;
}
}
tracing::trace!(
num = locator.number(),
block_id = ?block_id,
?block_presence,
"fork block",
);
}
if let Some(batch) = batch {
batch.apply(dst_branch.id(), write_keys).await?;
}
Ok(())
}
fn block_count(len: u64) -> u32 {
(1 + (len + HEADER_SIZE as u64 - 1) / BLOCK_SIZE as u64)
.try_into()
.unwrap_or(u32::MAX)
}
async fn read_len(
tx: &mut ReadTransaction,
root_node: &RootNode,
blob_id: BlobId,
read_key: &cipher::SecretKey,
) -> Result<u64> {
let (_, buffer) = read_block(tx, root_node, &Locator::head(blob_id), read_key).await?;
Ok(buffer.read_u64(0))
}
async fn load_block_count_hint(
tx: &mut ReadTransaction,
root_node: &RootNode,
blob_id: BlobId,
read_key: &cipher::SecretKey,
) -> Result<u32> {
match read_len(tx, root_node, blob_id, read_key).await {
Ok(len) => Ok(block_count(len)),
Err(Error::Store(store::Error::BlockNotFound)) => Ok(u32::MAX),
Err(error) => Err(error),
}
}
async fn read_block(
tx: &mut ReadTransaction,
root_node: &RootNode,
locator: &Locator,
read_key: &cipher::SecretKey,
) -> Result<(BlockId, BlockContent)> {
let (id, _) = tx
.find_block_at(root_node, &locator.encode(read_key))
.await?;
let mut content = BlockContent::new();
let nonce = tx.read_block(&id, &mut content).await?;
decrypt_block(read_key, &nonce, &mut content);
Ok((id, content))
}
fn write_block(
changeset: &mut Changeset,
locator: &Locator,
mut content: BlockContent,
read_key: &cipher::SecretKey,
) -> BlockId {
let nonce = make_block_nonce(locator, &content, read_key);
encrypt_block(read_key, &nonce, &mut content);
let block = Block::new(content, nonce);
let block_id = block.id;
changeset.link_block(
locator.encode(read_key),
block.id,
SingleBlockPresence::Present,
);
changeset.write_block(block);
tracing::trace!(?locator, ?block_id, "write block");
block_id
}
fn decrypt_block(blob_key: &cipher::SecretKey, block_nonce: &BlockNonce, content: &mut [u8]) {
let block_key = SecretKey::derive_from_key(blob_key.as_array(), block_nonce);
block_key.decrypt_no_aead(&Nonce::default(), content);
}
fn encrypt_block(blob_key: &cipher::SecretKey, block_nonce: &BlockNonce, content: &mut [u8]) {
let block_key = SecretKey::derive_from_key(blob_key.as_array(), block_nonce);
block_key.encrypt_no_aead(&Nonce::default(), content);
}
fn make_block_nonce(
locator: &Locator,
plaintext_content: &[u8],
read_key: &cipher::SecretKey,
) -> BlockNonce {
(read_key.as_ref(), locator, plaintext_content)
.hash()
.into()
}