Skip to content

Commit

Permalink
file upload & download from node to/from remote device
Browse files Browse the repository at this point in the history
  • Loading branch information
vesellov committed Jan 4, 2025
1 parent bb004a1 commit ab58c17
Show file tree
Hide file tree
Showing 15 changed files with 277 additions and 107 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
Change Log
==========

2025-01-04 Veselin Penev [penev.veselin@gmail.com](mailto:penev.veselin@gmail.com)

* built api.chunk_write() method to make possible device to node file transfer
* built api.chunk_read() method to make possible node to device file transfer
* handling situations when wrong server or client code was entered during web socket device handshake procedure
* minor updates in api.file_* methods
* solved threading issue in routed_web_socket()
* bug fixes in DHT and service_web_socket_communicator()
* minor fix in keys_synchronizer()



2024-12-30 Veselin Penev [penev.veselin@gmail.com](mailto:penev.veselin@gmail.com)

* new approach to run BitDust on mobile devices
Expand Down
152 changes: 96 additions & 56 deletions bitdust/interface/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,64 @@ def request_model_data(model_name, query_details=None):
#------------------------------------------------------------------------------


def chunk_read(path, offset, max_size=1024*32):
"""
Requests chunk of data from a local file. Used to download a file via WebSocket stream.
Binary data is encoded to a text string using "latin1" encoding.
The "path" must be pointing to a location inside of the "~/.bitdust/temp/" folder.
###### WebSocket
websocket.send('{"command": "api_call", "method": "chunk_read", "kwargs": {"path": "/tmp/cat.png", "offset": 1000, "max_size": 8192} }');
"""
from bitdust.stream import chunk
from bitdust.system import tmpfile
if not path.startswith(tmpfile.base_dir()):
return ERROR('wrong path location provided')
try:
raw_data = chunk.data_read(file_path=path, offset=offset, max_size=max_size, to_text=True)
except Exception as exc:
return ERROR(exc)
if not raw_data:
return OK({'chunk': '', 'completed': True})
return OK({'chunk': raw_data})


def chunk_write(data, path=None):
"""
Writes chunk of data to a local file. Used to upload a file via WebSocket stream.
Text data is decoded to a binary string using "latin1" encoding.
When "path" argument is empty a new file will be opened in a temporarily location,
response will include full local path to the file.
When the "path" is present it must be pointing to a location inside of the "~/.bitdust/temp/" folder.
###### WebSocket
websocket.send('{"command": "api_call", "method": "chunk_write", "kwargs": {"path": "/tmp/cat.png", "data": "ABCD1234"} }');
"""
from bitdust.stream import chunk
from bitdust.system import tmpfile
file_path = path
if path:
if not path.startswith(tmpfile.base_dir()):
return ERROR('wrong path location provided')
else:
_, file_path = tmpfile.make('upload', close_fd=True)
try:
chunk.data_write(file_path=file_path, data=data, from_text=True)
except Exception as exc:
return ERROR(exc)
if not path:
return OK({'path': file_path})
return OK()


#------------------------------------------------------------------------------


def process_stop(instant=True):
"""
Stop the main process immediately.
Expand Down Expand Up @@ -2020,18 +2078,6 @@ def files_list(remote_path=None, key_id=None, recursive=True, all_customers=Fals
remotePath = bpio.remotePath(norm_path['path'])
customer_idurl = norm_path['idurl']
key_alias = norm_path['key_alias'] if not key_id else key_id.split('$')[0]
if _Debug:
lg.out(
_DebugLevel, 'api.files_list remote_path=%s key_id=%s key_alias=%s recursive=%s all_customers=%s include_uploads=%s include_downloads=%s' % (
remote_path,
key_id,
key_alias,
recursive,
all_customers,
include_uploads,
include_downloads,
)
)
if not all_customers and customer_idurl not in backup_fs.known_customers():
return ERROR('customer %s was not found' % customer_idurl)
backup_info_callback = None
Expand Down Expand Up @@ -2115,7 +2161,7 @@ def files_list(remote_path=None, key_id=None, recursive=True, all_customers=Fals
'pending': [],
},
'downloads': [],
'local_path': os.path.join(local_dir, i['name']),
'local_path': os.path.join(local_dir, bpio.remotePath(i['path'])),
}
if include_uploads:
backup_control.tasks()
Expand Down Expand Up @@ -2155,25 +2201,25 @@ def files_list(remote_path=None, key_id=None, recursive=True, all_customers=Fals
r['uploads']['running'] = running
r['uploads']['pending'] = pending
if include_downloads:
from bitdust.storage import restore_monitor
downloads = []
for backupID in restore_monitor.FindWorking(pathID=full_glob_id):
d = restore_monitor.GetWorkingRestoreObject(backupID)
if d:
downloads.append(
{
'backup_id': d.backup_id,
'creator_id': d.creator_id,
'path_id': d.path_id,
'version': d.version,
'block_number': d.block_number,
'bytes_processed': d.bytes_written,
'created': time.asctime(time.localtime(d.Started)),
'aborted': d.abort_flag,
'done': d.done_flag,
'eccmap': '' if not d.EccMap else d.EccMap.name,
}
)
if driver.is_on('service_restores'):
for backupID in restore_monitor.FindWorking(pathID=full_glob_id):
d = restore_monitor.GetWorkingRestoreObject(backupID)
if d:
downloads.append(
{
'backup_id': d.backup_id,
'creator_id': d.creator_id,
'path_id': d.path_id,
'version': d.version,
'block_number': d.block_number,
'bytes_processed': d.bytes_written,
'created': time.asctime(time.localtime(d.Started)),
'aborted': d.abort_flag,
'done': d.done_flag,
'eccmap': '' if not d.EccMap else d.EccMap.name,
}
)
r['downloads'] = downloads
result.append(r)
if _Debug:
Expand Down Expand Up @@ -2296,6 +2342,7 @@ def file_info(remote_path, include_uploads=True, include_downloads=True):
'customer': norm_path['customer'],
'path_id': pathID,
'path': remotePath,
'name': item.name(),
'type': backup_fs.TYPES.get(item.type, '').lower(),
'size': item_size,
'latest': item_time,
Expand All @@ -2306,7 +2353,7 @@ def file_info(remote_path, include_uploads=True, include_downloads=True):
'pending': [],
},
'downloads': [],
'local_path': os.path.join(settings.getRestoreDir(), item.name()),
'local_path': os.path.join(settings.getRestoreDir(), remotePath),
}
if include_uploads:
backup_control.tasks()
Expand Down Expand Up @@ -2346,9 +2393,8 @@ def file_info(remote_path, include_uploads=True, include_downloads=True):
r['uploads']['running'] = running
r['uploads']['pending'] = pending
if include_downloads:
downloads = []
if driver.is_on('service_restores'):
from bitdust.storage import restore_monitor
downloads = []
for backupID in restore_monitor.FindWorking(pathID=pathID):
d = restore_monitor.GetWorkingRestoreObject(backupID)
if d:
Expand All @@ -2366,7 +2412,7 @@ def file_info(remote_path, include_uploads=True, include_downloads=True):
'eccmap': '' if not d.EccMap else d.EccMap.name,
}
)
r['downloads'] = downloads
r['downloads'] = downloads
if _Debug:
lg.out(_DebugLevel, 'api.file_info : %r' % pathID)
r['revision'] = backup_fs.revision()
Expand Down Expand Up @@ -2812,9 +2858,6 @@ def files_downloads():
if not driver.is_on('service_backups'):
return ERROR('service_backups() is not started')
from bitdust.storage import restore_monitor
if _Debug:
lg.out(_DebugLevel, 'api.files_downloads')
lg.out(_DebugLevel, ' %d items downloading at the moment' % len(restore_monitor.GetWorkingObjects()))
return RESULT(
[
{
Expand Down Expand Up @@ -2859,14 +2902,13 @@ def file_download_start(remote_path, destination_path=None, wait_result=False, p
"""
if not driver.is_on('service_restores'):
return ERROR('service_restores() is not started')
if _Debug:
lg.out(_DebugLevel, 'api.file_download_start remote_path=%s destination_path=%s wait_result=%s' % (remote_path, destination_path, wait_result))
from bitdust.storage import backup_fs
from bitdust.storage import backup_control
from bitdust.storage import restore_monitor
from bitdust.system import bpio
from bitdust.system import tmpfile
from bitdust.lib import packetid
from bitdust.main import settings
# from bitdust.main import settings
from bitdust.userid import my_id
from bitdust.userid import global_id
from bitdust.crypt import my_keys
Expand All @@ -2891,7 +2933,7 @@ def file_download_start(remote_path, destination_path=None, wait_result=False, p
if not version:
version = item.get_latest_version()
if not version:
return ERROR('did not found any remote versions for %s' % remote_path)
return ERROR('no remotely stored versions found')
if item.key_id:
key_alias = packetid.KeyAlias(item.key_id)
customerGlobalID = global_id.MakeGlobalID(customer=glob_path['customer'], key_alias=key_alias)
Expand All @@ -2908,7 +2950,7 @@ def file_download_start(remote_path, destination_path=None, wait_result=False, p
if not version:
version = item.get_latest_version()
if not version:
return ERROR('did not found any remote versions for %s' % remote_path)
return ERROR('no remotely stored versions found')
if item.key_id:
key_alias = packetid.KeyAlias(item.key_id)
customerGlobalID = global_id.MakeGlobalID(customer=glob_path['customer'], key_alias=key_alias)
Expand All @@ -2926,10 +2968,12 @@ def file_download_start(remote_path, destination_path=None, wait_result=False, p
))
if not knownPath:
return ERROR('location %s was not found in the catalog' % knownPath)
# if not destination_path:
# destination_path = settings.getRestoreDir()
# if not destination_path:
# destination_path = settings.DefaultRestoreDir()
if not destination_path:
destination_path = settings.getRestoreDir()
if not destination_path:
destination_path = settings.DefaultRestoreDir()
destination_path = tmpfile.make_dir('download')
key_id = my_keys.make_key_id(alias=keyAlias, creator_glob_id=customerGlobalID)
ret = Deferred()

Expand All @@ -2941,7 +2985,7 @@ def _on_result(backupID, result):
'downloaded': True,
'key_id': key_id,
'backup_id': backupID,
'local_path': destination_path,
'local_path': os.path.join(destination_path, glob_path['path']),
'path_id': pathID_target,
'remote_path': knownPath,
},
Expand All @@ -2957,7 +3001,7 @@ def _on_result(backupID, result):
'downloaded': False,
'key_id': key_id,
'backup_id': backupID,
'local_path': destination_path,
'local_path': os.path.join(destination_path, glob_path['path']),
'path_id': pathID_target,
'remote_path': knownPath,
},
Expand All @@ -2972,18 +3016,14 @@ def _start_restore():
if wait_result:
restore_monitor.Start(backupID, destination_path, keyID=key_id, callback=_on_result)
return ret
restore_monitor.Start(
backupID,
destination_path,
keyID=key_id,
)
restore_monitor.Start(backupID, destination_path, keyID=key_id)
ret.callback(
OK(
{
'downloaded': False,
'key_id': key_id,
'backup_id': backupID,
'local_path': destination_path,
'local_path': os.path.join(destination_path, glob_path['path']),
'path_id': pathID_target,
'remote_path': knownPath,
},
Expand All @@ -3006,7 +3046,7 @@ def _on_share_connected(active_share, callback_id, result):
details={
'key_id': active_share.key_id,
'backup_id': backupID,
'local_path': destination_path,
'local_path': os.path.join(destination_path, glob_path['path']),
'path_id': pathID_target,
'remote_path': knownPath,
},
Expand Down Expand Up @@ -3110,7 +3150,7 @@ def file_download_stop(remote_path):
for version in versions:
backupIDs.append(packetid.MakeBackupID(glob_path['customer'], knownPathID, version, key_alias=key_alias))
if not backupIDs:
return ERROR('did not found any remote versions for %s' % remote_path)
return ERROR('no remotely stored versions found')
r = []
for backupID in backupIDs:
r.append({
Expand Down
15 changes: 11 additions & 4 deletions bitdust/interface/api_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,20 @@ def start_routed_devices():
start_device(device_name)


def stop_devices():
def stop_routed_devices():
for device_name in devices():
device_key_object = devices(device_name)
if not device_key_object.active:
if not device_key_object:
continue
stop_device(device_name)
if device_key_object.meta['routed']:
if instances(device_name):
stop_device(device_name)


def stop_devices():
for device_name in devices():
if instances(device_name):
stop_device(device_name)


#------------------------------------------------------------------------------
Expand Down Expand Up @@ -581,7 +589,6 @@ def push(json_data):
#------------------------------------------------------------------------------

if __name__ == '__main__':
from twisted.internet import reactor
settings.init()
lg.set_debug_level(24)
automat.init()
Expand Down
Loading

0 comments on commit ab58c17

Please sign in to comment.