This repository has been archived by the owner on Oct 1, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfile_scanner_module.py
108 lines (91 loc) · 3.98 KB
/
file_scanner_module.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
""" This module compares hashes of a file to a given list of hashes
This module contains a class that can scan a file by using its hash and comparing it
to a bigger list of hashes. If the hash gets found, we consider the file as a virus,
else we consider it to be a clean file.
"""
import os.path
import time
import asyncio
import xxhash
import mmap
from database_api import HashAPI
class FileScanner:
""" Defines the FileScanner object with all its functions and arguments.
The FileScanner has the ability to compare a given file with a list of signatures and decide
if the file is a virus or not. He does this using bisect and the hash of the file.
Attributes:
dirty_files: List of files whose hash was found in the signature list
"""
amount_of_files = 0
hasher: HashAPI
dirty_files: list[str] = []
path = ""
def __init__(self, path, db_location):
print("Scanner initialized")
""" Initializes the class by setting the given parameters
The class requires a location to collect all files from, it also collects files
from subdirectories. And the class also needs the location of the signatures list,
a list containing all known malicious file hashes.
Args:
path: Location of where you want to search for files, must be a directory
Raises:
IOError: If the path is not a directory, or could not be found
"""
# Checks if path is a directory and sets it to the class
if os.path.exists(path):
self.path = path
self.hasher = HashAPI(db_location)
else:
print(f"Path: dir ? {str(os.path.isdir(path))} & exists ?{str(os.path.exists(path))}")
raise IOError("Invalid path or path not a directory")
async def search_files(self, directory):
for root, dirs, files in os.walk(directory):
for file in files:
self.amount_of_files += 1
file_path = os.path.join(root, file)
xxhash_hash = await asyncio.create_task(self.calculate_xxhash(file_path))
print(f"{xxhash_hash}: {file_path}")
if self.hasher.hash_exists(xxhash_hash):
print(f"{file_path}: {xxhash_hash}")
self.dirty_files.append(file_path)
@staticmethod
async def calculate_xxhash(file_path):
if os.stat(file_path).st_size != 0:
try:
with open(file_path, 'rb') as fp:
with mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) as m:
return xxhash.xxh64(m).hexdigest()
except IOError:
print(f"Error: Could not open {file_path}. Do you have the necessary permissions?")
async def scan_files(self):
if os.path.isdir(self.path):
await self.search_files(self.path)
else:
self.amount_of_files += 1
xxhash_hash = await asyncio.create_task(self.calculate_xxhash(self.path))
if self.hasher.hash_exists(xxhash_hash):
self.dirty_files.append(self.path)
def start_scanner(self):
print("Starting scanner... Please wait")
tic = time.perf_counter()
try:
loop = asyncio.get_event_loop()
except RuntimeError as ex:
print(f"Asyncio error: {ex}")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.scan_files())
toc = time.perf_counter()
print(
(
(
(
"\nScanner finished! \n"
+ f"Scanned files: {str(self.amount_of_files)} \n"
+ f"Bad files: {len(self.dirty_files)} \n"
)
+ f"Scanned path: {self.path} \n"
)
+ f"Execution time: {toc - tic:0.4f} seconds"
)
)