diff --git a/clients/filesystem-fuse/Cargo.toml b/clients/filesystem-fuse/Cargo.toml index d559d207312..830693203d4 100644 --- a/clients/filesystem-fuse/Cargo.toml +++ b/clients/filesystem-fuse/Cargo.toml @@ -39,7 +39,14 @@ futures-util = "0.3.30" fuse3 = { version = "0.8.1", "features" = ["tokio-runtime", "unprivileged"] } libc = "0.2.164" log = "0.4.22" +opendal = { version = "0.46.0", features = ["services-s3"] } tokio = { version = "1.38.0", features = ["full"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } regex = "1.11.1" -async-trait = "0.1" \ No newline at end of file +async-trait = "0.1" +reqwest = { version = "0.12.9", features = ["json"] } +serde = { version = "1.0.215", features = ["derive"] } +urlencoding = "2.1.3" + +[dev-dependencies] +mockito = "0.31" \ No newline at end of file diff --git a/clients/filesystem-fuse/src/cloud_storage_filesystem.rs b/clients/filesystem-fuse/src/cloud_storage_filesystem.rs new file mode 100644 index 00000000000..4b7f181169b --- /dev/null +++ b/clients/filesystem-fuse/src/cloud_storage_filesystem.rs @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::filesystem::{ + FileReader, FileStat, FileSystemCapacity, FileWriter, OpenFileFlags, OpenedFile, + PathFileSystem, Result, +}; +use crate::filesystem_metadata::DefaultFileSystemMetadata; +use crate::utils::join_file_path; +use async_trait::async_trait; +use bytes::Bytes; +use fuse3::{Errno, FileType, Timestamp}; +use futures_util::{TryFutureExt, TryStreamExt}; +use log::debug; +use opendal::layers::LoggingLayer; +use opendal::{services, EntryMode, ErrorKind, Metadata, Operator}; +use std::ops::Range; +use std::sync::{Mutex, RwLock}; +use std::time::SystemTime; + +pub(crate) struct CloudStorageFileSystem { + op: Operator, +} + +impl CloudStorageFileSystem { + pub fn new(op: Operator) -> Self { + Self { + op: op, + } + } +} + +#[async_trait] +impl PathFileSystem for CloudStorageFileSystem { + async fn init(&self) {} + + async fn stat(&self, name: &str) -> Result { + let meta = self.op.stat(name).await.map_err(opendal_error_to_errno)?; + let mut file_stat = FileStat::new_file_with_path(name, 0); + opdal_meta_to_file_stat(&meta, &mut file_stat); + Ok(file_stat) + } + + async fn lookup(&self, parent: &str, name: &str) -> Result { + let path = join_file_path(parent, name); + self.stat(&path).await + } + + async fn read_dir(&self, name: &str) -> Result> { + let entries = self.op.list(name).await.map_err(opendal_error_to_errno)?; + entries + .iter() + .map(|entry| { + let path = entry.path().trim_end_matches('/'); + let mut file_stat = FileStat::new_file_with_path(&path, 0); + opdal_meta_to_file_stat(entry.metadata(), &mut file_stat); + debug!("read dir file stat: {:?}", file_stat); + Ok(file_stat) + }) + .collect() + } + + async fn open_file(&self, name: &str, flags: OpenFileFlags) -> Result { + let file_stat = self.stat(name).await?; + debug_assert!(file_stat.kind == FileType::RegularFile); + let mut file = OpenedFile::new(file_stat); + if flags.is_read() { + let reader = self + .op + .reader_with(name) + .await + .map_err(opendal_error_to_errno)?; + file.reader = Some(Box::new(FileReaderImpl { reader })); + } + if flags.is_write() { + let writer = self + .op + .writer_with(name) + .await + .map_err(opendal_error_to_errno)?; + file.writer = Some(Box::new(FileWriterImpl { writer })); + } + Ok(file) + } + + async fn open_dir(&self, name: &str, flags: OpenFileFlags) -> Result { + let file_stat = self.stat(name).await?; + debug_assert!(file_stat.kind == FileType::Directory); + let mut file = OpenedFile::new(file_stat); + Ok(file) + } + + async fn create_file( + &self, + parent: &str, + name: &str, + flags: OpenFileFlags, + ) -> Result { + let mut file = OpenedFile::new(FileStat::new_file_with_path(name, 0)); + + if flags.is_read() { + let reader = self + .op + .reader_with(name) + .await + .map_err(opendal_error_to_errno)?; + file.reader = Some(Box::new(FileReaderImpl { reader })); + } + if flags.is_write() { + let writer = self + .op + .writer_with(name) + .await + .map_err(opendal_error_to_errno)?; + file.writer = Some(Box::new(FileWriterImpl { writer })); + } + Ok(file) + } + + async fn create_dir(&self, parent: &str, name: &str) -> Result { + let path = join_file_path(parent, name); + self.op + .create_dir(&path) + .await + .map_err(opendal_error_to_errno)?; + let file_stat = self.stat(&path).await?; + Ok(OpenedFile::new(file_stat)) + } + + async fn set_attr(&self, name: &str, file_stat: &FileStat, flush: bool) -> Result<()> { + Ok(()) + } + + async fn remove_file(&self, parent: &str, name: &str) -> Result<()> { + self.op + .remove(vec![join_file_path(parent, name)]) + .await + .map_err(opendal_error_to_errno) + } + + async fn remove_dir(&self, parent: &str, name: &str) -> Result<()> { + //todo:: need to consider keeping the behavior of posix remove dir when the dir is not empty + self.op + .remove_all(&join_file_path(parent, name)) + .await + .map_err(opendal_error_to_errno) + } + + fn get_capacity(&self) -> Result { + Ok(FileSystemCapacity {}) + } +} + +struct FileReaderImpl { + reader: opendal::Reader, +} + +#[async_trait] +impl FileReader for FileReaderImpl { + async fn read(&mut self, offset: u64, size: u32) -> Result { + let end = offset + size as u64; + let v = self + .reader + .read(offset..end) + .await + .map_err(opendal_error_to_errno)?; + Ok(v.to_bytes()) + } +} + +struct FileWriterImpl { + writer: opendal::Writer, +} + +#[async_trait] +impl FileWriter for FileWriterImpl { + async fn write(&mut self, offset: u64, data: &[u8]) -> Result { + self.writer + .write(data.to_vec()) + .await + .map_err(opendal_error_to_errno)?; + Ok(data.len() as u32) + } + + async fn close(&mut self) -> Result<()> { + self.writer.close().await.map_err(opendal_error_to_errno)?; + Ok(()) + } +} + +fn opendal_error_to_errno(err: opendal::Error) -> fuse3::Errno { + debug!("opendal_error2errno: {:?}", err); + match err.kind() { + ErrorKind::Unsupported => Errno::from(libc::EOPNOTSUPP), + ErrorKind::IsADirectory => Errno::from(libc::EISDIR), + ErrorKind::NotFound => Errno::from(libc::ENOENT), + ErrorKind::PermissionDenied => Errno::from(libc::EACCES), + ErrorKind::AlreadyExists => Errno::from(libc::EEXIST), + ErrorKind::NotADirectory => Errno::from(libc::ENOTDIR), + ErrorKind::RateLimited => Errno::from(libc::EBUSY), + _ => Errno::from(libc::ENOENT), + } +} + +fn opendal_filemode_to_filetype(mode: EntryMode) -> FileType { + match mode { + EntryMode::DIR => FileType::Directory, + _ => FileType::RegularFile, + } +} + +fn opdal_meta_to_file_stat(meta: &Metadata, file_stat: &mut FileStat) { + let now = SystemTime::now(); + let mtime = meta.last_modified().map(|x| x.into()).unwrap_or(now); + + file_stat.size = meta.content_length(); + file_stat.kind = opendal_filemode_to_filetype(meta.mode()); + file_stat.ctime = Timestamp::from(mtime); + file_stat.atime = Timestamp::from(now); + file_stat.mtime = Timestamp::from(mtime); +} diff --git a/clients/filesystem-fuse/src/config.rs b/clients/filesystem-fuse/src/config.rs new file mode 100644 index 00000000000..e60ad3cce53 --- /dev/null +++ b/clients/filesystem-fuse/src/config.rs @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use std::collections::{BTreeMap, HashMap}; + +pub(crate) struct Config { + pub(crate) fuse: FuseConfig, + pub(crate) filesystem: FilesystemConfig, + pub(crate) gravitino: GravitinoConfig, + pub(crate) extent_config: HashMap, +} + +impl Config { + pub(crate) fn new() -> Config { + Config { + fuse: FuseConfig { + mount_to: "/mnt/gravitino".to_string(), + mount_from: "/mnt/gravitino".to_string(), + default_mask: 0o777, + properties: HashMap::new(), + }, + filesystem: FilesystemConfig { + block_size: 4096, + }, + gravitino: GravitinoConfig { + gravitino_url: "http://localhost:8080".to_string(), + metalake: "http://localhost:8080".to_string(), + }, + extent_config: HashMap::new(), + } + } +} + +pub(crate) struct FuseConfig { + pub(crate) mount_to: String, + pub(crate) mount_from: String, + pub(crate) default_mask:u32, + pub(crate) properties: HashMap, +} + +pub(crate) struct FilesystemConfig { + pub(crate) block_size: u32 +} + +pub(crate) struct GravitinoConfig { + pub(crate) gravitino_url: String, + pub(crate) metalake: String, +} \ No newline at end of file diff --git a/clients/filesystem-fuse/src/error.rs b/clients/filesystem-fuse/src/error.rs new file mode 100644 index 00000000000..b59d32aa569 --- /dev/null +++ b/clients/filesystem-fuse/src/error.rs @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +#[derive(Debug)] +pub enum ErrorCode { + UnSupportedFilesystem, + GravitinoClientError, +} + +impl ErrorCode { + pub fn to_string(&self) -> String { + match self { + ErrorCode::UnSupportedFilesystem=> "The filesystem is not supported".to_string(), + _ => "".to_string(), + } + } + pub fn to_error(self, message: impl Into) -> GravitinoError { + GravitinoError::Error(self, message.into()) + } +} + +#[derive(Debug)] +pub enum GravitinoError { + RestError(String, reqwest::Error), + Error(ErrorCode, String), +} +impl From for GravitinoError { + fn from(err: reqwest::Error) -> Self { + GravitinoError::RestError("Http request failed:".to_owned() + &err.to_string(), err) + } +} \ No newline at end of file diff --git a/clients/filesystem-fuse/src/filesystem.rs b/clients/filesystem-fuse/src/filesystem.rs index a195aa8953d..2610486a04f 100644 --- a/clients/filesystem-fuse/src/filesystem.rs +++ b/clients/filesystem-fuse/src/filesystem.rs @@ -22,6 +22,7 @@ use crate::utils::join_file_path; use async_trait::async_trait; use bytes::Bytes; use fuse3::{Errno, FileType, Timestamp}; +use futures_util::{FutureExt, TryFutureExt}; use std::collections::HashMap; use std::sync::atomic::AtomicU64; use std::sync::{Mutex, RwLock}; @@ -103,6 +104,8 @@ pub trait PathFileSystem: Send + Sync { async fn remove_file(&self, parent: &str, name: &str) -> Result<()>; async fn remove_dir(&self, parent: &str, name: &str) -> Result<()>; + + fn get_capacity(&self) -> Result; } pub struct FileSystemContext { @@ -121,6 +124,34 @@ impl FileSystemContext { pub struct OpenFileFlags(u32); +impl OpenFileFlags { + pub fn is_read(&self) -> bool { + self.0 & libc::O_RDONLY as u32 == 0 || self.0 & libc::O_RDWR as u32 == 0 + } + + pub fn is_write(&self) -> bool { + self.0 & libc::O_WRONLY as u32 == 0 || self.0 & libc::O_RDWR as u32 == 0 + } + + pub fn is_append(&self) -> bool { + self.0 & libc::O_APPEND as u32 != 0 + } + + pub fn is_create(&self) -> bool { + self.0 & libc::O_CREAT as u32 != 0 + } + + pub fn is_truncate(&self) -> bool { + self.0 & libc::O_TRUNC as u32 != 0 + } + + pub fn is_exclusive(&self) -> bool { + self.0 & libc::O_EXCL as u32 != 0 + } +} + +pub struct FileSystemCapacity {} + #[derive(Clone, Debug)] pub struct FileStat { // inode id for the file system, also call file id @@ -155,6 +186,9 @@ pub struct FileStat { // file link count pub(crate) nlink: u32, + + // filestat timestamp after retrieved from original file system + pub(crate) timestamp: Timestamp, } impl FileStat { @@ -191,6 +225,7 @@ impl FileStat { mtime: atime, ctime: atime, nlink: 1, + timestamp: atime, } } @@ -208,6 +243,7 @@ impl FileStat { mtime: atime, ctime: atime, nlink: 1, + timestamp: atime, } } @@ -622,3 +658,201 @@ impl RawFileSystem for SimpleFileSystem { len } } + +/* +pub struct BasicFileSystem { + // meta is the metadata of the filesystem + meta: RwLock, + + // file_handle_manager is a manager for opened files. + file_handle_manager: RwLock, + + inode_id_generator: AtomicU64, + + fs: Box, +} + +impl BasicFileSystem { + const FILE_STAT_EXPIRE_TIME: i32 = 0; + + pub fn new(fs: Box) -> Self { + Self { + meta: RwLock::new(DefaultFileSystemMetadata::new()), + file_handle_manager: RwLock::new(FileHandleManager::new()), + inode_id_generator: AtomicU64::new(10000), + fs, + } + } + + fn next_inode_id(&self) -> u64 { + self.inode_id_generator + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) + } + + fn get_and_update_file_stat(&self, file_id: u64) -> Result { + let file_stat = self + .meta + .read() + .unwrap() + .get_file(file_id) + .ok_or(Errno::from(libc::ENOENT))?; + + if timestamp_diff_from_now(&file_stat.timestamp) < Self::FILE_STAT_EXPIRE_TIME { + Ok(file_stat) + } else { + match file_stat.kind { + FileType::Directory => self.handle_file_stat_expired(&file_stat), + FileType::RegularFile => self.handle_file_stat_expired(&file_stat), + _ => Err(Errno::from(libc::ENOSYS)), + } + } + } + + fn handle_file_stat_expired(&self, old_file: &FileStat) -> Result { + self.fs.stat(&old_file.path).map(|new_stat| { + self.meta + .write() + .unwrap() + .update_file(old_file.inode, &new_stat); + new_stat + }) + } + + fn handle_dir_stat_expired(&self, old_dir: &FileStat) -> Result { + self.handle_file_stat_expired(old_dir); + let childs = self.fs.read_dir(&old_dir.path)?; + todo!() + } +} + +#[async_trait] +impl RawFileSystem for BasicFileSystem { + async fn init(&self) { + let mut meta = self.meta.write().unwrap(); + meta.init_root_dir(); + } + + async fn get_file_path(&self, file_id: u64) -> String { + let meta = self.meta.read().unwrap(); + meta.get_file_path(file_id) + } + + async fn get_opened_file(&self, _file_id: u64, fh: u64) -> Result { + let file_handle_map = self.file_handle_manager.read().unwrap(); + file_handle_map + .get_file(fh) + .ok_or(Errno::from(libc::ENOENT)) + } + + async fn stat(&self, file_id: u64) -> Result { + self.get_and_update_file_stat(file_id) + } + + async fn lookup(&self, parent_file_id: u64, name: &str) -> Result { + self.get_and_update_file_stat(parent_file_id)?; + self.meta + .read() + .unwrap() + .find_file(parent_file_id, name) + .ok_or(Errno::from(libc::ENOENT)) + } + + async fn read_dir(&self, file_id: u64) -> Result> { + self.get_and_update_file_stat(file_id)?; + let meta = self.meta.read().unwrap(); + Ok(meta.get_dir_childs(file_id)) + } + + async fn open_file(&self, file_id: u64) -> Result { + let meta = self.meta.read().unwrap(); + let file_stat = meta.get_file(file_id).ok_or(Errno::from(libc::ENOENT))?; + let mut file_handle_map = self.file_handle_manager.write().unwrap(); + let file_handle = file_handle_map.create_file(&file_stat); + Ok(file_handle) + } + + async fn create_file(&self, parent_file_id: u64, name: &str) -> Result { + let dir_stat = self.get_and_update_file_stat(parent_file_id)?; + + let mut file_stat = self.fs.create_file(&dir_stat.path, name)?; + file_stat.inode = self.next_inode_id(); + file_stat.parent_inode = parent_file_id; + + let mut meta = self.meta.write().unwrap(); + meta.put_file(&file_stat); + + let mut file_handle_map = self.file_handle_manager.write().unwrap(); + let file_handle = file_handle_map.create_file(&file_stat); + + Ok(file_handle) + } + + async fn create_dir(&self, parent_file_id: u64, name: &str) -> Result { + let dir_stat = self.get_and_update_file_stat(parent_file_id)?; + + let mut file_stat = self.fs.create_dir(&dir_stat.path, name)?; + file_stat.parent_inode = parent_file_id; + + let mut meta = self.meta.write().unwrap(); + meta.put_dir(&file_stat); + + let mut file_handle_map = self.file_handle_manager.write().unwrap(); + let file_handle = file_handle_map.create_file(&file_stat); + Ok(file_handle) + } + + async fn set_attr(&self, file_id: u64, file_stat: &FileStat) -> Result<()> { + let mut meta = self.meta.write().unwrap(); + meta.update_file(file_id, file_stat); + self.fs.set_attr(&file_stat.name, file_stat, true) + } + + async fn update_file_status(&self, file_id: u64, file_stat: &FileStat) -> Result<()> { + let mut meta = self.meta.write().unwrap(); + meta.update_file(file_id, file_stat); + self.fs.set_attr(&file_stat.name, file_stat, false) + } + + async fn remove_file(&self, parent_file_id: u64, name: &str) -> Result<()> { + let mut meta = self.meta.write().unwrap(); + let dir_stat = meta + .get_file(parent_file_id) + .ok_or(Errno::from(libc::ENOENT))?; + meta.remove_file(parent_file_id, name); + self.fs.remove_file(&dir_stat.path, name) + } + + async fn remove_dir(&self, parent_file_id: u64, name: &str) -> Result<()> { + let mut meta = self.meta.write().unwrap(); + let dir_stat = meta + .get_file(parent_file_id) + .ok_or(Errno::from(libc::ENOENT))?; + meta.remove_dir(parent_file_id, name); + self.fs.remove_dir(&dir_stat.path, name) + } + + async fn close_file(&self, _file_id: u64, fh: u64) -> Result<()> { + let mut file_handle_manager = self.file_handle_manager.write().unwrap(); + file_handle_manager.remove_file(fh); + Ok(()) + } + + async fn read(&self, file_id: u64, fh: u64) -> Box { + let file = { + let file_handle_map = self.file_handle_manager.read().unwrap(); + file_handle_map.get_file(fh).unwrap() + }; + + self.fs.read(&file) + } + + async fn write(&self, file_id: u64, fh: u64) -> Box { + let file = { + let file_handle_map = self.file_handle_manager.read().unwrap(); + file_handle_map.get_file(fh).unwrap() + }; + + self.fs.write(&file) + } +} + */ diff --git a/clients/filesystem-fuse/src/fuse_server.rs b/clients/filesystem-fuse/src/fuse_server.rs index 1e3b1738bca..6993547bc0a 100644 --- a/clients/filesystem-fuse/src/fuse_server.rs +++ b/clients/filesystem-fuse/src/fuse_server.rs @@ -16,17 +16,24 @@ * specific language governing permissions and limitations * under the License. */ +use crate::cloud_storage_filesystem::CloudStorageFileSystem; use crate::filesystem::{FileSystemContext, SimpleFileSystem}; use crate::fuse_api_handle::FuseApiHandle; +use crate::log_fuse_api_handle::LogFuseApiHandle; use crate::memory_filesystem::MemoryFileSystem; use fuse3::raw::{MountHandle, Session}; use fuse3::{MountOptions, Result}; use log::{error, info}; +use opendal::layers::LoggingLayer; +use opendal::{services, Operator}; use std::process::exit; use std::sync::Arc; use std::time::Duration; +use serde::__private::ser::constrain; use tokio::sync::{Mutex, Notify}; use tokio::time::timeout; +use crate::config::{Config, FilesystemConfig, FuseConfig, GravitinoConfig}; +use crate::gravitino_filesystem::GravitinoFileSystem; /// Represents a FUSE server capable of starting and stopping the FUSE filesystem. pub struct FuseServer { @@ -51,12 +58,17 @@ impl FuseServer { } /// Starts the FUSE filesystem and blocks until it is stopped. - pub async fn start(&self) -> Result<()> { - let fs = SimpleFileSystem::new(MemoryFileSystem::new()); + pub async fn start(&self,config: &Config) -> Result<()> { let uid = unsafe { libc::getuid() }; let gid = unsafe { libc::getgid() }; let fs_context = FileSystemContext { uid: uid, gid: gid }; - let fuse_fs = FuseApiHandle::new(fs, fs_context); + + let gvfs = GravitinoFileSystem::new(&config, &fs_context).await; + let fs = SimpleFileSystem::new(gvfs); + + //let fs = SimpleFileSystem::new(MemoryFileSystem::new()); + let fuse_fs = LogFuseApiHandle::new(FuseApiHandle::new(fs, fs_context)); + //let fuse_fs = FuseApiHandle::new(fs, fs_context); //check if the mount point exists if !std::path::Path::new(&self.mount_point).exists() { diff --git a/clients/filesystem-fuse/src/gravitino_client.rs b/clients/filesystem-fuse/src/gravitino_client.rs new file mode 100644 index 00000000000..541f044fc66 --- /dev/null +++ b/clients/filesystem-fuse/src/gravitino_client.rs @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use reqwest::Client; +use serde::Deserialize; +use std::collections::HashMap; +use std::fmt::{Debug, Display}; +use urlencoding::encode; +use crate::config::GravitinoConfig; +use crate::error::{ErrorCode, GravitinoError}; + +#[derive(Debug, Deserialize)] +pub(crate) struct Fileset { + pub(crate) name: String, + #[serde(rename = "type")] + pub(crate) fileset_type: String, + comment: String, + #[serde(rename = "storageLocation")] + pub(crate) storage_location: String, + properties: HashMap, +} + +#[derive(Debug, Deserialize)] +struct FilesetResponse { + code: i32, + fileset: Fileset, +} + +#[derive(Debug, Deserialize)] +struct FileLocationResponse { + code: i32, + #[serde(rename = "fileLocation")] + location: String, +} + +pub(crate) struct GravitinoClient { + gravitino_uri: String, + metalake: String, + + http_client: Client, +} + +impl GravitinoClient { + + pub fn new(config :&GravitinoConfig) -> Self { + Self { + gravitino_uri: config.gravitino_url.clone(), + metalake: config.metalake.clone(), + http_client: Client::new(), + } + } + + pub fn init(&self) {} + + pub fn do_post(&self, path: &str, data: &str) { + println!("POST request to {} with data: {}", path, data); + } + + pub fn do_get(&self, path: &str) { + println!("GET request to {}", path); + } + + pub fn request(&self, path: &str, data: &str) -> Result<(), GravitinoError> { + todo!() + } + + pub fn list_schema(&self) -> Result<(), GravitinoError> { + todo!() + } + + pub fn list_fileset(&self) -> Result<(), GravitinoError> { + todo!() + } + + fn get_fileset_url(&self, catalog_name: &str, schema_name: &str, fileset_name: &str) -> String { + format!( + "{}/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}", + self.gravitino_uri, self.metalake, catalog_name, schema_name, fileset_name) + } + + async fn send_and_parse(&self, url: &str) -> Result + where + T: for<'de> Deserialize<'de>, + { + let http_resp = self.http_client.get(url).send().await.map_err(|e| { + GravitinoError::RestError(format!("Failed to send request to {}", url), e) + })?; + + let res = http_resp.json::().await.map_err(|e| { + GravitinoError::RestError(format!("Failed to parse response from {}", url), e) + })?; + + Ok(res) + } + + pub async fn get_fileset( + &self, + catalog_name: &str, + schema_name: &str, + fileset_name: &str, + ) -> Result { + let url = self.get_fileset_url(catalog_name, schema_name, fileset_name); + let res = self.send_and_parse::(&url).await?; + + if res.code != 0 { + return Err(GravitinoError::Error( + ErrorCode::GravitinoClientError, + "Failed to get fileset".to_string(), + )); + } + Ok(res.fileset) + } + + pub fn get_file_location_url(&self, catalog_name: &str, schema_name: &str, fileset_name: &str, path: &str) -> String { + let encoded_path = encode(path); + format!( + "{}/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}/location?sub_path={}", + self.gravitino_uri, self.metalake, catalog_name, schema_name, fileset_name, path) + } + + pub async fn get_file_location( + &self, + catalog_name: &str, + schema_name: &str, + fileset_name: &str, + path: &str, + ) -> Result { + let url = self.get_file_location_url(catalog_name, schema_name, fileset_name, path); + let res = self.send_and_parse::(&url).await?; + + if res.code != 0 { + return Err(GravitinoError::Error( + ErrorCode::GravitinoClientError, + "Failed to get file location".to_string(), + )); + } + Ok(res.location) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use mockito::mock; + use tokio; + + #[tokio::test] + async fn test_get_fileset_success() { + tracing_subscriber::fmt::init(); + let fileset_response = r#" + { + "code": 0, + "fileset": { + "name": "example_fileset", + "type": "example_type", + "comment": "This is a test fileset", + "storageLocation": "/example/path", + "properties": { + "key1": "value1", + "key2": "value2" + } + } + }"#; + + let mock_server_url = &mockito::server_url(); + + let url = format!( + "/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}", + "test", "catalog1", "schema1", "fileset1" + ); + let _m = mock("GET", url.as_str()) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(fileset_response) + .create(); + + let config = GravitinoConfig { + gravitino_url: mock_server_url.to_string(), + metalake: "test".to_string(), + }; + let client = GravitinoClient::new(&config); + + let result = client.get_fileset("catalog1", "schema1", "fileset1").await; + + match result { + Ok(fileset) => { + assert_eq!(fileset.name, "example_fileset"); + assert_eq!(fileset.fileset_type, "example_type"); + assert_eq!(fileset.storage_location, "/example/path"); + assert_eq!(fileset.properties.get("key1"), Some(&"value1".to_string())); + } + Err(e) => panic!("Expected Ok, but got Err: {:?}", e), + } + } + + #[tokio::test] + async fn test_get_file_location_success() { + tracing_subscriber::fmt::init(); + let file_location_response = r#" + { + "code": 0, + "fileLocation": "/mybucket/a" + }"#; + + let mock_server_url = &mockito::server_url(); + + let url = format!( + "/api/metalakes/{}/catalogs/{}/schemas/{}/filesets/{}/location?sub_path={}", + "test", "catalog1", "schema1", "fileset1", "/example/path" + ); + let _m = mock("GET", url.as_str()) + .with_status(200) + .with_header("content-type", "application/json") + .with_body(file_location_response) + .create(); + + let config = GravitinoConfig { + gravitino_url: mock_server_url.to_string(), + metalake: "test".to_string(), + }; + let client = GravitinoClient::new(&config); + + let result = client.get_file_location("catalog1", "schema1", "fileset1", "/example/path").await; + + match result { + Ok(location) => { + assert_eq!(location, "/mybucket/a"); + } + Err(e) => panic!("Expected Ok, but got Err: {:?}", e), + } + } +} diff --git a/clients/filesystem-fuse/src/gravitino_compose_filesystem.rs b/clients/filesystem-fuse/src/gravitino_compose_filesystem.rs new file mode 100644 index 00000000000..fe3cd66a735 --- /dev/null +++ b/clients/filesystem-fuse/src/gravitino_compose_filesystem.rs @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::filesystem::{ + FileReader, FileStat, FileSystemCapacity, FileWriter, OpenFileFlags, OpenedFile, + PathFileSystem, Result, +}; +use crate::gravitino_client::GravitinoClient; +use crate::storage_filesystem::StorageFileSystem; +use async_trait::async_trait; +use bytes::Bytes; +use dashmap::DashMap; + +pub(crate) struct GravitinoComposeFileSystem { + // meta is the metadata of the filesystem + client: GravitinoClient, + inner_fs: DashMap +} + +impl GravitinoComposeFileSystem { + pub fn new(client: GravitinoClient) -> Self { + Self { + client, + inner_fs: Default::default(), + } + } +} + +#[async_trait] +impl PathFileSystem for GravitinoComposeFileSystem { + async fn init(&self) { + todo!() + } + + async fn stat(&self, name: &str) -> Result { + todo!() + } + + async fn lookup(&self, parent: &str, name: &str) -> Result { + todo!() + } + + async fn read_dir(&self, name: &str) -> Result> { + todo!() + } + + async fn open_file(&self, name: &str, flags: OpenFileFlags) -> Result { + todo!() + } + + async fn open_dir(&self, name: &str, flags: OpenFileFlags) -> Result { + todo!() + } + + async fn create_file(&self, parent: &str, name: &str, flags: OpenFileFlags) -> Result { + todo!() + } + + async fn create_dir(&self, parent: &str, name: &str) -> Result { + todo!() + } + + async fn set_attr(&self, name: &str, file_stat: &FileStat, flush: bool) -> Result<()> { + todo!() + } + + async fn remove_file(&self, parent: &str, name: &str) -> Result<()> { + todo!() + } + + async fn remove_dir(&self, parent: &str, name: &str) -> Result<()> { + todo!() + } + + fn get_capacity(&self) -> Result { + todo!() + } +} + +struct FileReaderImpl { + reader: opendal::Reader, +} + +#[async_trait] +impl FileReader for FileReaderImpl { + async fn read(&mut self, offset: u64, size: u32) -> Result { + todo!() + } +} + +struct FileWriterImpl { + writer: opendal::Writer, +} + +#[async_trait] +impl FileWriter for FileWriterImpl { + async fn write(&mut self, offset: u64, data: &[u8]) -> Result { + todo!() + } + + async fn close(&mut self) -> Result<()> { + todo!() + } +} diff --git a/clients/filesystem-fuse/src/gravitino_filesystem.rs b/clients/filesystem-fuse/src/gravitino_filesystem.rs new file mode 100644 index 00000000000..62b4c215742 --- /dev/null +++ b/clients/filesystem-fuse/src/gravitino_filesystem.rs @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use crate::config::Config; +use crate::filesystem::{FileStat, FileSystemCapacity, FileSystemContext, OpenFileFlags, OpenedFile, PathFileSystem, Result}; +use crate::gravitino_client::GravitinoClient; +use crate::storage_filesystem::StorageFileSystem; +use crate::utils::{extract_fileset, extract_storage_filesystem}; +use async_trait::async_trait; + +pub(crate) struct GravitinoFileSystemConfig { +} + +pub(crate) struct GravitinoFileSystem { + fs : StorageFileSystem, + client: GravitinoClient, + fileset_location: String, +} + +impl GravitinoFileSystem { + pub async fn new(config: &Config, context: &FileSystemContext) -> Self { + let client = GravitinoClient::new(&config.gravitino); + let fileset_location = config.fuse.mount_from.clone(); + let (catalog, schema, fileset) = extract_fileset(&fileset_location).unwrap(); + let fileset = client.get_fileset(&catalog, &schema, &fileset).await.unwrap(); + + let (schema, location) = extract_storage_filesystem(&fileset.storage_location).unwrap(); + let fs = StorageFileSystem::new(&schema, &config, &context).await.unwrap(); + + Self { + fs: fs, + client: client, + fileset_location: location, + } + } + + fn map_fileset_location(&self, name: &str) -> String { + format!("{}/{}", self.fileset_location, name) + } +} + +#[async_trait] +impl PathFileSystem for GravitinoFileSystem { + async fn init(&self) { + self.fs.init().await; + } + + async fn stat(&self, name: &str) -> Result { + let name = self.map_fileset_location(name); + self.fs.stat(&name).await + } + + async fn lookup(&self, parent: &str, name: &str) -> Result { + let parent = self.map_fileset_location(parent); + self.fs.lookup(&parent, name).await + } + + + async fn read_dir(&self, name: &str) -> Result> { + let name = self.map_fileset_location(name); + self.fs.read_dir(&name).await + } + + async fn open_file(&self, name: &str, flags: OpenFileFlags) -> Result { + let name = self.map_fileset_location(name); + self.fs.open_file(&name, flags).await + } + + async fn open_dir(&self, name: &str, flags: OpenFileFlags) -> Result { + let name = self.map_fileset_location(name); + self.fs.open_dir(&name, flags).await + } + + async fn create_file(&self, parent: &str, name: &str, flags: OpenFileFlags) -> Result { + let parent = self.map_fileset_location(parent); + self.fs.create_file(&parent, name, flags).await + } + + async fn create_dir(&self, parent: &str, name: &str) -> Result { + let parent = self.map_fileset_location(parent); + self.fs.create_dir(&parent, name).await + } + + async fn set_attr(&self, name: &str, file_stat: &FileStat, flush: bool) -> Result<()> { + let name = self.map_fileset_location(name); + self.fs.set_attr(&name, file_stat, flush).await + } + + async fn remove_file(&self, parent: &str, name: &str) -> Result<()> { + let parent = self.map_fileset_location(parent); + self.fs.remove_file(&parent, name).await + } + + async fn remove_dir(&self, parent: &str, name: &str) -> Result<()> { + let parent = self.map_fileset_location(parent); + self.fs.remove_dir(&parent, name).await + } + + fn get_capacity(&self) -> Result { + self.fs.get_capacity() + } +} \ No newline at end of file diff --git a/clients/filesystem-fuse/src/lib.rs b/clients/filesystem-fuse/src/lib.rs index b9d4aa8be42..0f1090c275d 100644 --- a/clients/filesystem-fuse/src/lib.rs +++ b/clients/filesystem-fuse/src/lib.rs @@ -16,10 +16,18 @@ * specific language governing permissions and limitations * under the License. */ +mod cloud_storage_filesystem; mod filesystem; mod filesystem_metadata; mod fuse_api_handle; pub mod fuse_server; +mod gravitino_client; +mod log_fuse_api_handle; mod memory_filesystem; mod opened_file_manager; mod utils; +mod gravitino_compose_filesystem; +mod gravitino_filesystem; +mod storage_filesystem; +mod config; +mod error; diff --git a/clients/filesystem-fuse/src/log_fuse_api_handle.rs b/clients/filesystem-fuse/src/log_fuse_api_handle.rs new file mode 100644 index 00000000000..554ccc8231e --- /dev/null +++ b/clients/filesystem-fuse/src/log_fuse_api_handle.rs @@ -0,0 +1,565 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::filesystem::{FileStat, FileSystemContext, RawFileSystem}; +use crate::fuse_api_handle::FuseApiHandle; +use fuse3::path::prelude::{ReplyData, ReplyOpen, ReplyStatFs, ReplyWrite}; +use fuse3::path::Request; +use fuse3::raw::prelude::{ + FileAttr, ReplyAttr, ReplyCreated, ReplyDirectory, ReplyDirectoryPlus, ReplyEntry, ReplyInit, +}; +use fuse3::raw::reply::{DirectoryEntry, DirectoryEntryPlus}; +use fuse3::raw::Filesystem; +use fuse3::FileType::{Directory, RegularFile}; +use fuse3::{Errno, FileType, Inode, SetAttr, Timestamp}; +use futures_util::stream::BoxStream; +use futures_util::StreamExt; +use futures_util::{stream, FutureExt}; +use log::{debug, error, info}; +use std::ffi::{OsStr, OsString}; +use std::future::Future; +use std::num::NonZeroU32; +use std::time::{Duration, SystemTime}; + +pub(crate) struct LogFuseApiHandle { + handle: FuseApiHandle, +} + +impl LogFuseApiHandle { + pub fn new(handle: FuseApiHandle) -> Self { + Self { handle: handle } + } +} + +impl Filesystem for LogFuseApiHandle { + async fn init(&self, req: Request) -> fuse3::Result { + debug!( + "INIT req_id={} pid={} [{},{}]", + req.unique, req.pid, req.uid, req.gid + ); + + let result = self.handle.init(req.clone()).await; + match &result { + Ok(reply) => { + debug!( + "INIT reply: req_id={}, max_write={}", + req.unique, reply.max_write + ); + } + Err(e) => { + debug!("INIT failed: req_id={} error={:?}", req.unique, e); + } + } + result + } + + async fn destroy(&self, req: Request) { + debug!( + "DESTROY req_id={} pid={} [{},{}]", + req.unique, req.pid, req.uid, req.gid + ); + self.handle.destroy(req).await; + } + + async fn lookup(&self, req: Request, parent: Inode, name: &OsStr) -> fuse3::Result { + debug!( + "LOOKUP req_id={} pid={} [{}, {}] parent={}({}) name={}", + req.unique, + req.pid, + req.uid, + req.gid, + self.handle.get_file_path(parent).await, + parent, + name.to_string_lossy(), + ); + + let result = self.handle.lookup(req.clone(), parent, name).await; + + match &result { + Ok(reply) => { + debug!( + "LOOKUP reply: req_id={}, [ino={} gen={} attr={:?}]", + req.unique, reply.attr.ino, reply.generation, reply.attr + ); + } + Err(e) => { + error!("LOOKUP failed: req_id={}, [error={:?}]", req.unique, e); + } + } + result + } + + async fn getattr( + &self, + req: Request, + inode: Inode, + fh: Option, + flags: u32, + ) -> fuse3::Result { + debug!( + "GETATTR req_id={} pid={} [{}, {}] inode={} fh={:?} flags={}", + req.unique, req.pid, req.uid, req.gid, inode, fh, flags + ); + + let result = self.handle.getattr(req.clone(), inode, fh, flags).await; + + match &result { + Ok(reply) => { + debug!( + "GETATTR reply: req_id={} [attr={:?}]", + req.unique, reply.attr + ); + } + Err(e) => { + error!("GETATTR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn setattr( + &self, + req: Request, + inode: Inode, + fh: Option, + set_attr: SetAttr, + ) -> fuse3::Result { + debug!( + "SETATTR req_id={} pid={} [{}, {}] inode={} fh={:?} set_attr={:?}", + req.unique, req.pid, req.uid, req.gid, inode, fh, set_attr + ); + + let result = self.handle.setattr(req.clone(), inode, fh, set_attr).await; + + match &result { + Ok(reply) => { + debug!( + "SETATTR reply: req_id={} [attr={:?}]", + req.unique, reply.attr + ); + } + Err(e) => { + error!("SETATTR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn mkdir( + &self, + req: Request, + parent: Inode, + name: &OsStr, + mode: u32, + umask: u32, + ) -> fuse3::Result { + debug!( + "MKDIR req_id={} pid={} [{}, {}] parent={}({}) name={} mode={} umask={}", + req.unique, + req.pid, + req.uid, + req.gid, + self.handle.get_file_path(parent).await, + parent, + name.to_string_lossy(), + mode, + umask + ); + + let result = self + .handle + .mkdir(req.clone(), parent, name, mode, umask) + .await; + + match &result { + Ok(reply) => { + debug!( + "MKDIR reply: req_id={} [ino={} gen={} attr={:?}]", + req.unique, reply.attr.ino, reply.generation, reply.attr + ); + } + Err(e) => { + error!("MKDIR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn unlink(&self, req: Request, parent: Inode, name: &OsStr) -> fuse3::Result<()> { + debug!( + "UNLINK req_id={} pid={} [{}, {}] parent={}({}) name={}", + req.unique, + req.pid, + req.uid, + req.gid, + self.handle.get_file_path(parent).await, + parent, + name.to_string_lossy() + ); + + let result = self.handle.unlink(req.clone(), parent, name).await; + + match &result { + Ok(_) => { + debug!("UNLINK reply: req_id={} [status=success]", req.unique); + } + Err(e) => { + error!("UNLINK failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn rmdir(&self, req: Request, parent: Inode, name: &OsStr) -> fuse3::Result<()> { + debug!( + "RMDIR req_id={} pid={} [{}, {}] parent={}({}) name={}", + req.unique, + req.pid, + req.uid, + req.gid, + self.handle.get_file_path(parent).await, + parent, + name.to_string_lossy() + ); + + let result = self.handle.rmdir(req.clone(), parent, name).await; + + match &result { + Ok(_) => { + debug!("RMDIR reply: req_id={} [status=success]", req.unique); + } + Err(e) => { + error!("RMDIR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn open(&self, req: Request, inode: Inode, flags: u32) -> fuse3::Result { + debug!( + "OPEN req_id={} pid={} [{}, {}] inode={} flags={}", + req.unique, req.pid, req.uid, req.gid, inode, flags + ); + + let result = self.handle.open(req.clone(), inode, flags).await; + + match &result { + Ok(reply) => { + debug!( + "OPEN reply: req_id={} [fh={} flags={}]", + req.unique, reply.fh, reply.flags + ); + } + Err(e) => { + error!("OPEN failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn read( + &self, + req: Request, + inode: Inode, + fh: u64, + offset: u64, + size: u32, + ) -> fuse3::Result { + debug!( + "READ req_id={} pid={} [{}, {}] inode={} fh={} offset={} size={}", + req.unique, req.pid, req.uid, req.gid, inode, fh, offset, size + ); + + let result = self.handle.read(req.clone(), inode, fh, offset, size).await; + + match &result { + Ok(_) => { + debug!( + "READ reply: req_id={} [status=success size={}]", + req.unique, size + ); + } + Err(e) => { + error!("READ failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn write( + &self, + req: Request, + inode: Inode, + fh: u64, + offset: u64, + data: &[u8], + write_flags: u32, + flags: u32, + ) -> fuse3::Result { + debug!( + "WRITE req_id={} pid={} [{}, {}] inode={} fh={} offset={} size={} write_flags={} flags={}", + req.unique, + req.pid, + req.uid, + req.gid, + inode, + fh, + offset, + data.len(), + write_flags, + flags + ); + + let result = self + .handle + .write(req.clone(), inode, fh, offset, data, write_flags, flags) + .await; + + match &result { + Ok(reply) => { + debug!( + "WRITE reply: req_id={} [written_bytes={}]", + req.unique, reply.written, + ); + } + Err(e) => { + error!("WRITE failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn statfs(&self, req: Request, inode: Inode) -> fuse3::Result { + debug!( + "STATFS req_id={} pid={} [{}, {}] inode={}", + req.unique, req.pid, req.uid, req.gid, inode + ); + + let result = self.handle.statfs(req.clone(), inode).await; + + match &result { + Ok(reply) => { + debug!( + "STATFS reply: req_id={} [blocks={} bfree={} bavail={} files={} ffree={}]", + req.unique, reply.blocks, reply.bfree, reply.bavail, reply.files, reply.ffree + ); + } + Err(e) => { + error!("STATFS failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn release( + &self, + req: Request, + inode: Inode, + fh: u64, + flags: u32, + lock_owner: u64, + flush: bool, + ) -> fuse3::Result<()> { + debug!( + "RELEASE req_id={} pid={} [{}, {}] inode={} fh={} flags={} lock_owner={} flush={}", + req.unique, req.pid, req.uid, req.gid, inode, fh, flags, lock_owner, flush + ); + + let result = self + .handle + .release(req.clone(), inode, fh, flags, lock_owner, flush) + .await; + + match &result { + Ok(_) => { + debug!("RELEASE reply: req_id={} [status=success]", req.unique); + } + Err(e) => { + error!("RELEASE failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn opendir(&self, req: Request, inode: Inode, flags: u32) -> fuse3::Result { + debug!( + "OPENDIR req_id={} pid={} [{}, {}] inode={} flags={}", + req.unique, req.pid, req.uid, req.gid, inode, flags + ); + + let result = self.handle.opendir(req.clone(), inode, flags).await; + + match &result { + Ok(reply) => { + debug!( + "OPENDIR reply: req_id={} [fh={} flags={}]", + req.unique, reply.fh, reply.flags + ); + } + Err(e) => { + error!("OPENDIR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + type DirEntryStream<'a> + = BoxStream<'a, fuse3::Result> + where + T: 'a; + + async fn readdir<'a>( + &'a self, + req: Request, + parent: Inode, + fh: u64, + offset: i64, + ) -> fuse3::Result>> { + debug!( + "READDIR req_id={} pid={} [{}, {}] parent={} fh={} offset={}", + req.unique, req.pid, req.uid, req.gid, parent, fh, offset + ); + + let result = self.handle.readdir(req.clone(), parent, fh, offset).await; + + match &result { + Ok(_) => { + debug!("READDIR reply: req_id={} [status=success]", req.unique); + } + Err(e) => { + error!("READDIR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn releasedir( + &self, + req: Request, + inode: Inode, + fh: u64, + flags: u32, + ) -> fuse3::Result<()> { + debug!( + "RELEASEDIR req_id={} pid={} [{}, {}] inode={} fh={} flags={}", + req.unique, req.pid, req.uid, req.gid, inode, fh, flags + ); + + let result = self.handle.releasedir(req.clone(), inode, fh, flags).await; + + match &result { + Ok(_) => { + debug!("RELEASEDIR reply: req_id={} [status=success]", req.unique); + } + Err(e) => { + error!("RELEASEDIR failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + async fn create( + &self, + req: Request, + parent: Inode, + name: &OsStr, + mode: u32, + flags: u32, + ) -> fuse3::Result { + debug!( + "CREATE req_id={} pid={} [{}, {}] parent={}({}) name={} mode={} flags={}", + req.unique, + req.pid, + req.uid, + req.gid, + self.handle.get_file_path(parent).await, + parent, + name.to_string_lossy(), + mode, + flags + ); + + let result = self + .handle + .create(req.clone(), parent, name, mode, flags) + .await; + + match &result { + Ok(reply) => { + debug!( + "CREATE reply: req_id={} [ino={} gen={} fh={} flags={}]", + req.unique, reply.attr.ino, reply.generation, reply.fh, reply.flags + ); + } + Err(e) => { + error!("CREATE failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } + + type DirEntryPlusStream<'a> + = BoxStream<'a, fuse3::Result> + where + T: 'a; + + async fn readdirplus<'a>( + &'a self, + req: Request, + parent: Inode, + fh: u64, + offset: u64, + lock_owner: u64, + ) -> fuse3::Result>> { + debug!( + "READDIRPLUS req_id={} pid={} [{}, {}] parent={} fh={} offset={} lock_owner={}", + req.unique, req.pid, req.uid, req.gid, parent, fh, offset, lock_owner + ); + + let result = self + .handle + .readdirplus(req.clone(), parent, fh, offset, lock_owner) + .await; + + match &result { + Ok(_) => { + debug!("READDIRPLUS reply: req_id={} [status=success]", req.unique); + } + Err(e) => { + error!("READDIRPLUS failed: req_id={} [error={:?}]", req.unique, e); + } + } + + result + } +} diff --git a/clients/filesystem-fuse/src/main.rs b/clients/filesystem-fuse/src/main.rs index 3bc655dc888..63734595002 100644 --- a/clients/filesystem-fuse/src/main.rs +++ b/clients/filesystem-fuse/src/main.rs @@ -16,26 +16,39 @@ * specific language governing permissions and limitations * under the License. */ +mod cloud_storage_filesystem; mod filesystem; mod filesystem_metadata; mod fuse_api_handle; mod fuse_server; +mod gravitino_client; +mod log_fuse_api_handle; mod memory_filesystem; mod opened_file_manager; mod utils; - +mod config; +mod error; +mod gravitino_compose_filesystem; +mod gravitino_filesystem; +mod storage_filesystem; use crate::fuse_server::FuseServer; use fuse3::Result; -use log::info; +use log::{debug, info}; use std::sync::Arc; +use crate::config::{Config}; #[tokio::main] async fn main() -> Result<()> { - tracing_subscriber::fmt().with_env_filter("debug").init(); + tracing_subscriber::fmt() + .with_env_filter("info,gvfs_fuse=debug,fuse3=warn") + .init(); + debug!("Starting gvfs-fuse server..."); let server = Arc::new(FuseServer::new("gvfs")); let clone_server = server.clone(); - let v = tokio::spawn(async move { clone_server.start().await }); + + let config = Config::new(); + let v = tokio::spawn(async move { clone_server.start(&config).await }); tokio::signal::ctrl_c().await?; info!("Received Ctrl+C, stopping server..."); diff --git a/clients/filesystem-fuse/src/memory_filesystem.rs b/clients/filesystem-fuse/src/memory_filesystem.rs index 8c4656e9ebe..3e4edccc0df 100644 --- a/clients/filesystem-fuse/src/memory_filesystem.rs +++ b/clients/filesystem-fuse/src/memory_filesystem.rs @@ -17,7 +17,8 @@ * under the License. */ use crate::filesystem::{ - FileReader, FileStat, FileWriter, OpenFileFlags, OpenedFile, PathFileSystem, Result, + FileReader, FileStat, FileSystemCapacity, FileWriter, OpenFileFlags, OpenedFile, + PathFileSystem, Result, }; use crate::filesystem_metadata::DefaultFileSystemMetadata; use crate::utils::join_file_path; @@ -50,8 +51,6 @@ impl MemoryFileSystem { } } - pub fn init(&self) {} - fn create_file_stat(&self, path: &str, file: &MemoryFile) -> FileStat { match file.kind { Directory => FileStat::new_dir_with_path(path), @@ -205,6 +204,10 @@ impl PathFileSystem for MemoryFileSystem { file_map.remove(&join_file_path(parent, name)); Ok(()) } + + fn get_capacity(&self) -> Result { + Ok(FileSystemCapacity {}) + } } pub(crate) struct MemoryFileReader { diff --git a/clients/filesystem-fuse/src/storage_filesystem.rs b/clients/filesystem-fuse/src/storage_filesystem.rs new file mode 100644 index 00000000000..9c4ea5a3589 --- /dev/null +++ b/clients/filesystem-fuse/src/storage_filesystem.rs @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use async_trait::async_trait; +use opendal::{services, Builder, Operator}; +use opendal::layers::LoggingLayer; +use crate::cloud_storage_filesystem::CloudStorageFileSystem; +use crate::config::Config; +use crate::error::{ErrorCode, GravitinoError}; +use crate::filesystem::{FileSystemContext, PathFileSystem}; +use crate::memory_filesystem::MemoryFileSystem; +use crate::filesystem::{FileStat, FileSystemCapacity, OpenFileFlags, OpenedFile, Result}; +use crate::utils::{GravitinoResult, StorageFileSystemType}; + +pub(crate) enum StorageFileSystem { + MemoryStorge(MemoryFileSystem), + CloudStorage(CloudStorageFileSystem), +} + +impl StorageFileSystem { + pub(crate) async fn new(fs_type: &StorageFileSystemType, config: &Config, context: &FileSystemContext) -> GravitinoResult { + match fs_type { + StorageFileSystemType::S3=> { + let builder = services::S3::from_map(config.extent_config.clone()); + let op = Operator::new(builder) + .expect("opendal create failed") + .layer(LoggingLayer::default()) + .finish(); + + let fs = CloudStorageFileSystem::new(op); + Ok(StorageFileSystem::CloudStorage(fs)) + } + _=> { + Err(ErrorCode::UnSupportedFilesystem.to_error(format!("Unsupported filesystem type: {}", fs_type))) + } + } + } +} + +macro_rules! async_call_fun { + ($self:expr, $fun:ident $(, $args:expr)* ) => { + match $self { + StorageFileSystem::MemoryStorge(fs) => fs.$fun($($args),*).await, + StorageFileSystem::CloudStorage(fs) => fs.$fun($($args),*).await, + } + }; +} + +macro_rules! call_fun { + ($self:expr, $fun:ident $(, $args:expr)* ) => { + match $self { + StorageFileSystem::MemoryStorge(fs) => fs.$fun($($args),*), + StorageFileSystem::CloudStorage(fs) => fs.$fun($($args),*), + } + }; +} + +#[async_trait] +impl PathFileSystem for StorageFileSystem { + async fn init(&self) { + async_call_fun!(self, init) + } + + async fn stat(&self, name: &str) -> Result { + async_call_fun!(self, stat, name) + } + + async fn lookup(&self, parent: &str, name: &str) -> Result { + async_call_fun!(self, lookup, parent, name) + } + + async fn read_dir(&self, name: &str) -> Result> { + async_call_fun!(self, read_dir, name) + } + + async fn open_file(&self, name: &str, flags: OpenFileFlags) -> Result { + async_call_fun!(self, open_file, name, flags) + } + + async fn open_dir(&self, name: &str, flags: OpenFileFlags) -> Result { + async_call_fun!(self, open_dir, name, flags) + } + + async fn create_file(&self, parent: &str, name: &str, flags: OpenFileFlags) -> Result { + async_call_fun!(self, create_file, parent, name, flags) + } + + async fn create_dir(&self, parent: &str, name: &str) -> Result { + async_call_fun!(self, create_dir, parent, name) + } + + async fn set_attr(&self, name: &str, file_stat: &FileStat, flush: bool) -> Result<()> { + async_call_fun!(self, set_attr, name, file_stat, flush) + } + + async fn remove_file(&self, parent: &str, name: &str) -> Result<()> { + async_call_fun!(self, remove_file, parent, name) + } + + async fn remove_dir(&self, parent: &str, name: &str) -> Result<()> { + async_call_fun!(self, remove_dir, parent, name) + } + + fn get_capacity(&self) -> Result { + call_fun!(self, get_capacity) + } +} diff --git a/clients/filesystem-fuse/src/utils.rs b/clients/filesystem-fuse/src/utils.rs index c51534b47af..eb893684015 100644 --- a/clients/filesystem-fuse/src/utils.rs +++ b/clients/filesystem-fuse/src/utils.rs @@ -16,6 +16,24 @@ * specific language governing permissions and limitations * under the License. */ +use std::fmt; +use fuse3::Timestamp; +use std::time::SystemTime; +use crate::error::GravitinoError; + +pub type GravitinoResult = Result; + +pub enum StorageFileSystemType { + S3, +} + +impl fmt::Display for StorageFileSystemType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + StorageFileSystemType::S3 => write!(f, "s3"), + } + } +} pub fn join_file_path(parent: &str, name: &str) -> String { if parent.is_empty() { @@ -24,3 +42,46 @@ pub fn join_file_path(parent: &str, name: &str) -> String { format!("{}/{}", parent, name) } } + +pub fn timestamp_diff_from_now(timestamp: Timestamp) -> i64 { + let now = Timestamp::from(SystemTime::now()); + timestamp.sec - now.sec +} + + +pub fn extract_fileset(path: &str) -> Option<(String, String, String)> { + let prefix = "gvfs://fileset/"; + if !path.starts_with(prefix) { + return None; + } + + let path_without_prefix = &path[prefix.len()..]; + + let parts: Vec<&str> = path_without_prefix.split('/').collect(); + + if parts.len() < 3 { + return None; + } + + let catalog = parts[0].to_string(); + let schema = parts[1].to_string(); + let fileset = parts[2].to_string(); + + Some((catalog, schema, fileset)) +} + +pub fn extract_storage_filesystem(path: &str) -> Option<(StorageFileSystemType, String)> { + if let Some(pos) = path.find("://") { + let protocol = &path[..pos]; + let location = &path[pos + 3..]; + match protocol { + "s3" => { + Some((StorageFileSystemType::S3, location.to_string())) + }, + _ => None, + } + } else { + None + } +} + diff --git a/clients/filesystem-fuse/tests/fuse_test.rs b/clients/filesystem-fuse/tests/fuse_test.rs index eb2a6a7b8f1..577f442d5ff 100644 --- a/clients/filesystem-fuse/tests/fuse_test.rs +++ b/clients/filesystem-fuse/tests/fuse_test.rs @@ -22,6 +22,7 @@ use log::info; use std::fs; use std::fs::File; use std::io::Write; +use std::os::unix::fs::MetadataExt; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::thread::sleep; @@ -56,6 +57,7 @@ impl FuseTest { while start_time.elapsed() < timeout { if let Ok(exists) = fs::exists(&test_file) { + info!("Wait for fuse server ready: {}", exists); if exists { return true; } @@ -92,7 +94,12 @@ fn test_fuse_system_with_auto() { test_fuse_filesystem(mount_point); } +fn test_fuse_system_with_manual() { + test_fuse_filesystem("build/gvfs"); +} + fn test_fuse_filesystem(mount_point: &str) { + info!("Test startup"); let base_path = Path::new(mount_point); //test create file