use crate::{
blob::{lock::UpgradableLock, Blob, BlockIds, ReadWriteError},
branch::Branch,
directory::{Directory, ParentContext},
error::{Error, Result},
protocol::{Bump, Locator, SingleBlockPresence, BLOCK_SIZE},
store::{Changeset, ReadTransaction},
version_vector::VersionVector,
};
use std::{fmt, future::Future, io::SeekFrom};
use tokio::io::{AsyncWrite, AsyncWriteExt};
pub struct File {
blob: Blob,
parent: ParentContext,
lock: UpgradableLock,
}
impl File {
pub(crate) async fn open(
branch: Branch,
locator: Locator,
parent: ParentContext,
) -> Result<Self> {
let lock = branch.locker().read(*locator.blob_id()).await;
let lock = UpgradableLock::Read(lock);
let mut tx = branch.store().begin_read().await?;
Ok(Self {
blob: Blob::open(&mut tx, branch, *locator.blob_id()).await?,
parent,
lock,
})
}
pub(crate) fn create(branch: Branch, locator: Locator, parent: ParentContext) -> Self {
let lock = branch
.locker()
.try_read(*locator.blob_id())
.ok()
.expect("blob_id collision");
let lock = UpgradableLock::Read(lock);
Self {
blob: Blob::create(branch, *locator.blob_id()),
parent,
lock,
}
}
pub fn branch(&self) -> &Branch {
self.blob.branch()
}
pub async fn parent(&self) -> Result<Directory> {
self.parent.open(self.branch().clone()).await
}
#[allow(clippy::len_without_is_empty)]
pub fn len(&self) -> u64 {
self.blob.len()
}
pub fn progress(&self) -> impl Future<Output = Result<u64>> {
let branch = self.branch().clone();
let blob_id = *self.blob.id();
let len = self.len();
async move {
let mut block_ids = BlockIds::open(branch, blob_id).await?;
let mut present = 0;
while let Some((_, block_presence)) = block_ids.try_next().await? {
match block_presence {
SingleBlockPresence::Present => {
present += 1;
}
SingleBlockPresence::Missing | SingleBlockPresence::Expired => (),
}
}
Ok((present as u64 * BLOCK_SIZE as u64).min(len))
}
}
pub async fn read(&mut self, buffer: &mut [u8]) -> Result<usize> {
loop {
match self.blob.read(buffer) {
Ok(len) => return Ok(len),
Err(ReadWriteError::CacheMiss) => {
let mut tx = self.branch().store().begin_read().await?;
self.blob.warmup(&mut tx).await?;
}
Err(ReadWriteError::CacheFull) => {
self.flush().await?;
}
}
}
}
pub async fn read_all(&mut self, buffer: &mut [u8]) -> Result<usize> {
let mut offset = 0;
loop {
match self.read(&mut buffer[offset..]).await? {
0 => return Ok(offset),
n => {
offset += n;
}
}
}
}
pub async fn read_to_end(&mut self) -> Result<Vec<u8>> {
let mut buffer = vec![
0;
(self.blob.len() - self.blob.seek_position())
.try_into()
.unwrap_or(usize::MAX)
];
self.read_all(&mut buffer[..]).await?;
Ok(buffer)
}
pub async fn write(&mut self, buffer: &[u8]) -> Result<usize> {
self.acquire_write_lock()?;
loop {
match self.blob.write(buffer) {
Ok(len) => return Ok(len),
Err(ReadWriteError::CacheMiss) => {
let mut tx = self.branch().store().begin_read().await?;
self.blob.warmup(&mut tx).await?;
}
Err(ReadWriteError::CacheFull) => {
self.flush().await?;
}
}
}
}
pub async fn write_all(&mut self, buffer: &[u8]) -> Result<()> {
let mut offset = 0;
loop {
match self.write(&buffer[offset..]).await? {
0 => return Ok(()),
n => {
offset += n;
}
}
}
}
pub fn seek(&mut self, pos: SeekFrom) -> u64 {
self.blob.seek(pos)
}
pub fn truncate(&mut self, len: u64) -> Result<()> {
self.acquire_write_lock()?;
self.blob.truncate(len)
}
pub async fn flush(&mut self) -> Result<()> {
if !self.blob.is_dirty() {
return Ok(());
}
let mut tx = self.branch().store().begin_write().await?;
let mut changeset = Changeset::new();
self.blob.flush(&mut tx, &mut changeset).await?;
self.parent
.bump(
&mut tx,
&mut changeset,
self.branch().clone(),
Bump::increment(*self.branch().id()),
)
.await?;
changeset
.apply(
&mut tx,
self.branch().id(),
self.branch()
.keys()
.write()
.ok_or(Error::PermissionDenied)?,
)
.await?;
let event_tx = self.branch().notify();
tx.commit_and_then(move || event_tx.send()).await?;
Ok(())
}
pub(crate) async fn save(
&mut self,
tx: &mut ReadTransaction,
changeset: &mut Changeset,
) -> Result<()> {
self.blob.flush(tx, changeset).await?;
Ok(())
}
pub async fn copy_to_writer<W: AsyncWrite + Unpin>(&mut self, dst: &mut W) -> Result<()> {
let mut buffer = vec![0; BLOCK_SIZE];
loop {
let len = self.read(&mut buffer).await?;
dst.write_all(&buffer[..len]).await.map_err(Error::Writer)?;
if len < buffer.len() {
break;
}
}
Ok(())
}
pub async fn fork(&mut self, dst_branch: Branch) -> Result<()> {
if self.branch().id() == dst_branch.id() {
return Ok(());
}
let parent = self.parent.fork(self.branch(), &dst_branch).await?;
let lock = dst_branch.locker().read(*self.blob.id()).await;
let lock = UpgradableLock::Read(lock);
let blob = {
let mut tx = dst_branch.store().begin_read().await?;
Blob::open(&mut tx, dst_branch, *self.blob.id()).await?
};
*self = Self { blob, parent, lock };
Ok(())
}
pub async fn version_vector(&self) -> Result<VersionVector> {
self.parent
.entry_version_vector(self.branch().clone())
.await
}
#[cfg(test)]
pub(crate) fn blob_id(&self) -> &crate::blob::BlobId {
self.blob.id()
}
fn acquire_write_lock(&mut self) -> Result<()> {
self.lock.upgrade().then_some(()).ok_or(Error::Locked)
}
}
impl fmt::Debug for File {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("File")
.field("blob_id", &self.blob.id())
.field("branch", &self.blob.branch().id())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
access_control::{AccessKeys, WriteSecrets},
branch::BranchShared,
crypto::sign::PublicKey,
db,
directory::{DirectoryFallback, DirectoryLocking},
event::EventSender,
store::Store,
test_utils,
};
use assert_matches::assert_matches;
use tempfile::TempDir;
#[tokio::test(flavor = "multi_thread")]
async fn fork() {
test_utils::init_log();
let (_base_dir, [branch0, branch1]) = setup().await;
let mut file0 = branch0.ensure_file_exists("dog.jpg".into()).await.unwrap();
file0.write_all(b"small").await.unwrap();
file0.flush().await.unwrap();
drop(file0);
let mut file1 = branch0
.open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
.await
.unwrap()
.lookup("dog.jpg")
.unwrap()
.file()
.unwrap()
.open()
.await
.unwrap();
file1.fork(branch1.clone()).await.unwrap();
file1.write_all(b"large").await.unwrap();
file1.flush().await.unwrap();
let mut file = branch0
.open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
.await
.unwrap()
.lookup("dog.jpg")
.unwrap()
.file()
.unwrap()
.open()
.await
.unwrap();
assert_eq!(file.read_to_end().await.unwrap(), b"small");
let mut file = branch1
.open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
.await
.unwrap()
.lookup("dog.jpg")
.unwrap()
.file()
.unwrap()
.open()
.await
.unwrap();
assert_eq!(file.read_to_end().await.unwrap(), b"large");
}
#[tokio::test(flavor = "multi_thread")]
async fn multiple_consecutive_modifications_of_forked_file() {
let (_base_dir, [branch0, branch1]) = setup().await;
let mut file0 = branch0.ensure_file_exists("/pig.jpg".into()).await.unwrap();
file0.flush().await.unwrap();
let mut file1 = branch0
.open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
.await
.unwrap()
.lookup("pig.jpg")
.unwrap()
.file()
.unwrap()
.open()
.await
.unwrap();
file1.fork(branch1).await.unwrap();
for _ in 0..2 {
file1.write_all(b"oink").await.unwrap();
file1.flush().await.unwrap();
}
}
#[tokio::test(flavor = "multi_thread")]
async fn concurrent_writes() {
let (_base_dir, [branch]) = setup().await;
let mut file0 = branch.ensure_file_exists("fox.txt".into()).await.unwrap();
let mut file1 = branch
.open_root(DirectoryLocking::Enabled, DirectoryFallback::Disabled)
.await
.unwrap()
.lookup("fox.txt")
.unwrap()
.file()
.unwrap()
.open()
.await
.unwrap();
file0.write_all(b"yip-yap").await.unwrap();
assert_matches!(file1.write_all(b"ring-ding-ding").await, Err(Error::Locked));
assert_matches!(file1.truncate(0), Err(Error::Locked));
}
#[tokio::test(flavor = "multi_thread")]
async fn copy_to_writer() {
use tokio::{fs, io::AsyncReadExt};
let (base_dir, [branch]) = setup().await;
let src_content = b"hello world";
let mut src = branch.ensure_file_exists("src.txt".into()).await.unwrap();
src.write_all(src_content).await.unwrap();
let dst_path = base_dir.path().join("dst.txt");
let mut dst = fs::File::create(&dst_path).await.unwrap();
src.seek(SeekFrom::Start(0));
src.copy_to_writer(&mut dst).await.unwrap();
dst.sync_all().await.unwrap();
drop(dst);
let mut dst = fs::File::open(&dst_path).await.unwrap();
let mut dst_content = Vec::new();
dst.read_to_end(&mut dst_content).await.unwrap();
assert_eq!(dst_content, src_content);
}
async fn setup<const N: usize>() -> (TempDir, [Branch; N]) {
let (base_dir, pool) = db::create_temp().await.unwrap();
let store = Store::new(pool);
let keys = AccessKeys::from(WriteSecrets::random());
let event_tx = EventSender::new(1);
let shared = BranchShared::new();
let branches = [(); N].map(|_| {
create_branch(
store.clone(),
event_tx.clone(),
keys.clone(),
shared.clone(),
)
});
(base_dir, branches)
}
fn create_branch(
store: Store,
event_tx: EventSender,
keys: AccessKeys,
shared: BranchShared,
) -> Branch {
let id = PublicKey::random();
Branch::new(id, store, keys, shared, event_tx)
}
}