Skip to content

Commit

Permalink
[apache#6279] feat (gvfs-fuse): Add gvfs-fuse integration tests for b…
Browse files Browse the repository at this point in the history
…ig files and open-file flag test cases (apache#6280)

### What changes were proposed in this pull request?

Add gvfs-fuse integration tests for big files and open-file flag test
cases

### Why are the changes needed?

Fix: apache#6279

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

IT
  • Loading branch information
diqiu50 authored Feb 5, 2025
1 parent 316efc2 commit 71998d9
Show file tree
Hide file tree
Showing 8 changed files with 587 additions and 121 deletions.
2 changes: 2 additions & 0 deletions clients/filesystem-fuse/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,7 @@ test-s3:
test: doc-test
cargo test --no-fail-fast --all-targets --all-features --workspace

test-all: test test-s3 test-fuse-it

clean:
cargo clean
20 changes: 13 additions & 7 deletions clients/filesystem-fuse/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ pub trait FileWriter: Sync + Send {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use libc::{O_APPEND, O_CREAT, O_RDONLY};
use libc::{O_CREAT, O_RDONLY, O_WRONLY};
use std::collections::HashMap;
use std::path::Component;

Expand Down Expand Up @@ -461,7 +461,11 @@ pub(crate) mod tests {

// Test create file
let file_handle = self
.test_create_file(parent_file_id, "file1.txt".as_ref())
.test_create_file(
parent_file_id,
"file1.txt".as_ref(),
(O_CREAT | O_WRONLY) as u32,
)
.await;

// Test write file
Expand Down Expand Up @@ -545,11 +549,13 @@ pub(crate) mod tests {
self.files.insert(file_stat.file_id, file_stat);
}

async fn test_create_file(&mut self, root_file_id: u64, name: &OsStr) -> FileHandle {
let file = self
.fs
.create_file(root_file_id, name, (O_CREAT | O_APPEND) as u32)
.await;
async fn test_create_file(
&mut self,
root_file_id: u64,
name: &OsStr,
flags: u32,
) -> FileHandle {
let file = self.fs.create_file(root_file_id, name, flags).await;
assert!(file.is_ok());
let file = file.unwrap();
assert!(file.handle_id > 0);
Expand Down
2 changes: 1 addition & 1 deletion clients/filesystem-fuse/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use tokio::signal;

#[tokio::main]
async fn main() -> fuse3::Result<()> {
tracing_subscriber::fmt().init();
tracing_subscriber::fmt::init();

// todo need inmprove the args parsing
let args: Vec<String> = std::env::args().collect();
Expand Down
35 changes: 18 additions & 17 deletions clients/filesystem-fuse/src/memory_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl PathFileSystem for MemoryFileSystem {
Ok(results)
}

async fn open_file(&self, path: &Path, _flags: OpenFileFlags) -> Result<OpenedFile> {
async fn open_file(&self, path: &Path, flags: OpenFileFlags) -> Result<OpenedFile> {
let file_stat = self.stat(path).await?;
let mut opened_file = OpenedFile::new(file_stat);
match opened_file.file_stat.kind {
Expand All @@ -105,8 +105,17 @@ impl PathFileSystem for MemoryFileSystem {
.unwrap()
.data
.clone();
opened_file.reader = Some(Box::new(MemoryFileReader { data: data.clone() }));
opened_file.writer = Some(Box::new(MemoryFileWriter { data: data }));
if flags.is_read() {
opened_file.reader = Some(Box::new(MemoryFileReader { data: data.clone() }));
}
if flags.is_write() || flags.is_append() || flags.is_truncate() {
opened_file.writer = Some(Box::new(MemoryFileWriter { data: data.clone() }));
}

if flags.is_truncate() {
let mut data = data.lock().unwrap();
data.clear();
}
Ok(opened_file)
}
_ => Err(Errno::from(libc::EBADF)),
Expand All @@ -117,27 +126,19 @@ impl PathFileSystem for MemoryFileSystem {
self.open_file(path, flags).await
}

async fn create_file(&self, path: &Path, _flags: OpenFileFlags) -> Result<OpenedFile> {
let mut file_map = self.file_map.write().unwrap();
if file_map.contains_key(path) {
async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result<OpenedFile> {
if self.file_map.read().unwrap().contains_key(path) && flags.is_exclusive() {
return Err(Errno::from(libc::EEXIST));
}

let mut opened_file = OpenedFile::new(FileStat::new_file_filestat_with_path(path, 0));

let data = Arc::new(Mutex::new(Vec::new()));
file_map.insert(
opened_file.file_stat.path.clone(),
self.file_map.write().unwrap().insert(
path.to_path_buf(),
MemoryFile {
kind: RegularFile,
data: data.clone(),
data: Arc::new(Mutex::new(Vec::new())),
},
);

opened_file.reader = Some(Box::new(MemoryFileReader { data: data.clone() }));
opened_file.writer = Some(Box::new(MemoryFileWriter { data: data }));

Ok(opened_file)
self.open_file(path, flags).await
}

async fn create_dir(&self, path: &Path) -> Result<FileStat> {
Expand Down
142 changes: 127 additions & 15 deletions clients/filesystem-fuse/src/open_dal_filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,26 @@ use bytes::Bytes;
use fuse3::FileType::{Directory, RegularFile};
use fuse3::{Errno, FileType, Timestamp};
use log::error;
use opendal::{EntryMode, ErrorKind, Metadata, Operator};
use opendal::{Buffer, EntryMode, ErrorKind, Metadata, Operator};
use std::mem::swap;
use std::path::{Path, PathBuf};
use std::time::SystemTime;

pub(crate) struct OpenDalFileSystem {
op: Operator,
block_size: u32,
}

impl OpenDalFileSystem {}

impl OpenDalFileSystem {
pub(crate) fn new(op: Operator, _config: &AppConfig, _fs_context: &FileSystemContext) -> Self {
Self { op: op }
const WRITE_BUFFER_SIZE: usize = 5 * 1024 * 1024;

pub(crate) fn new(op: Operator, config: &AppConfig, _fs_context: &FileSystemContext) -> Self {
Self {
op: op,
block_size: config.filesystem.block_size,
}
}

fn opendal_meta_to_file_stat(&self, meta: &Metadata, file_stat: &mut FileStat) {
Expand Down Expand Up @@ -120,14 +127,30 @@ impl PathFileSystem for OpenDalFileSystem {
.map_err(opendal_error_to_errno)?;
file.reader = Some(Box::new(FileReaderImpl { reader }));
}
if flags.is_write() || flags.is_create() || flags.is_append() || flags.is_truncate() {
if !flags.is_create() && flags.is_append() {
error!("The file system does not support open a exists file with the append mode");
return Err(Errno::from(libc::EINVAL));
}

if flags.is_truncate() {
self.op
.write(&file_name, Buffer::new())
.await
.map_err(opendal_error_to_errno)?;
}

if flags.is_write() || flags.is_append() || flags.is_truncate() {
let writer = self
.op
.writer_with(&file_name)
.await
.map_err(opendal_error_to_errno)?;
file.writer = Some(Box::new(FileWriterImpl { writer }));
file.writer = Some(Box::new(FileWriterImpl::new(
writer,
Self::WRITE_BUFFER_SIZE + self.block_size as usize,
)));
}

Ok(file)
}

Expand All @@ -141,15 +164,17 @@ impl PathFileSystem for OpenDalFileSystem {

async fn create_file(&self, path: &Path, flags: OpenFileFlags) -> Result<OpenedFile> {
let file_name = path.to_string_lossy().to_string();
if flags.is_exclusive() {
let meta = self.op.stat(&file_name).await;
if meta.is_ok() {
return Err(Errno::from(libc::EEXIST));
}
}

let mut writer = self
.op
.writer_with(&file_name)
self.op
.write(&file_name, Buffer::new())
.await
.map_err(opendal_error_to_errno)?;

writer.close().await.map_err(opendal_error_to_errno)?;

let file = self.open_file(path, flags).await?;
Ok(file)
}
Expand Down Expand Up @@ -210,19 +235,45 @@ impl FileReader for FileReaderImpl {

struct FileWriterImpl {
writer: opendal::Writer,
buffer: Vec<u8>,
buffer_size: usize,
}

impl FileWriterImpl {
fn new(writer: opendal::Writer, buffer_size: usize) -> Self {
Self {
writer,
buffer_size: buffer_size,
buffer: Vec::with_capacity(buffer_size),
}
}
}

#[async_trait]
impl FileWriter for FileWriterImpl {
async fn write(&mut self, _offset: u64, data: &[u8]) -> Result<u32> {
self.writer
.write(data.to_vec())
.await
.map_err(opendal_error_to_errno)?;
if self.buffer.len() > OpenDalFileSystem::WRITE_BUFFER_SIZE {
let mut new_buffer: Vec<u8> = Vec::with_capacity(self.buffer_size);
swap(&mut new_buffer, &mut self.buffer);

self.writer
.write(new_buffer)
.await
.map_err(opendal_error_to_errno)?;
}
self.buffer.extend(data);
Ok(data.len() as u32)
}

async fn close(&mut self) -> Result<()> {
if !self.buffer.is_empty() {
let mut new_buffer: Vec<u8> = vec![];
swap(&mut new_buffer, &mut self.buffer);
self.writer
.write(new_buffer)
.await
.map_err(opendal_error_to_errno)?;
}
self.writer.close().await.map_err(opendal_error_to_errno)?;
Ok(())
}
Expand Down Expand Up @@ -260,10 +311,12 @@ fn opendal_filemode_to_filetype(mode: EntryMode) -> FileType {
#[cfg(test)]
mod test {
use crate::config::AppConfig;
use crate::open_dal_filesystem::OpenDalFileSystem;
use crate::s3_filesystem::extract_s3_config;
use crate::s3_filesystem::tests::s3_test_config;
use crate::test_enable_with;
use crate::RUN_TEST_WITH_S3;
use bytes::Buf;
use opendal::layers::LoggingLayer;
use opendal::{services, Builder, Operator};

Expand Down Expand Up @@ -327,4 +380,63 @@ mod test {
}
}
}

#[tokio::test]
async fn s3_ut_test_s3_write() {
test_enable_with!(RUN_TEST_WITH_S3);
let config = s3_test_config();

let op = create_opendal(&config);
let path = "/s1/fileset1/gvfs_test/test_dir/test_file";
let mut writer = op.writer_with(path).await.unwrap();

let mut buffer: Vec<u8> = vec![];
let num_batch = 10 * 1024;
for i in 0..num_batch {
let data = vec![i as u8; num_batch];
buffer.extend(&data);

if buffer.len() > OpenDalFileSystem::WRITE_BUFFER_SIZE {
writer.write(buffer).await.unwrap();
buffer = vec![];
};
}

if !buffer.is_empty() {
writer.write(buffer).await.unwrap();
}
writer.close().await.unwrap();
}

#[tokio::test]
async fn s3_ut_test_s3_read() {
test_enable_with!(RUN_TEST_WITH_S3);
let config = s3_test_config();

let op = create_opendal(&config);
let path = "/s1/fileset1/test_dir/test_big_file";
let meta = op.stat(path).await;
if meta.is_err() {
println!("stat error: {:?}", meta.err());
return;
}
let reader = op.reader(path).await.unwrap();

let mut buffer = Vec::new();

let batch_size = 1024;
let mut start = 0;
let mut end = batch_size;
loop {
let buf = reader.read(start..end).await.unwrap();
if buf.is_empty() {
break;
}
buffer.extend_from_slice(buf.chunk());
start = end;
end += batch_size;
}

println!("Read {} bytes.", buffer.len());
}
}
11 changes: 10 additions & 1 deletion clients/filesystem-fuse/tests/bin/run_fuse_testers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,22 @@ if [ "$1" == "test" ]; then
echo "Running tests..."
cd $CLIENT_FUSE_DIR
export RUN_TEST_WITH_FUSE=1
cargo test --test fuse_test fuse_it_
cargo test --test fuse_test fuse_it_ -- weak_consistency

elif [ "$1" == "start" ]; then
# Start the servers
echo "Starting servers..."
start_servers

elif [ "$1" == "restart" ]; then
# Stop the servers
echo "Stopping servers..."
stop_servers

# Start the servers
echo "Starting servers..."
start_servers

elif [ "$1" == "stop" ]; then
# Stop the servers
echo "Stopping servers..."
Expand Down
9 changes: 9 additions & 0 deletions clients/filesystem-fuse/tests/bin/run_s3fs_testers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ elif [ "$1" == "start" ]; then
echo "Starting servers..."
start_servers

elif [ "$1" == "restart" ]; then
# Stop the servers
echo "Stopping servers..."
stop_servers

# Start the servers
echo "Starting servers..."
start_servers

elif [ "$1" == "stop" ]; then
# Stop the servers
echo "Stopping servers..."
Expand Down
Loading

0 comments on commit 71998d9

Please sign in to comment.