From 4233899c413288f18d6bcb232b98b832f0827a8e Mon Sep 17 00:00:00 2001 From: Thomas Eiszler Date: Tue, 20 Jun 2017 09:51:30 -0400 Subject: [PATCH] A bit of code cleanup; Added signal for termination of VM started by stream server. --- bin/cloudlet | 6 +- bin/handoff-server-proc | 409 +--------------------- elijah/provisioning/compression.py | 4 +- elijah/provisioning/configuration.py | 2 +- elijah/provisioning/server.py | 2 +- elijah/provisioning/stream_server.py | 81 +---- elijah/provisioning/synthesis_protocol.py | 2 +- 7 files changed, 26 insertions(+), 480 deletions(-) diff --git a/bin/cloudlet b/bin/cloudlet index e8da7aa2..dd801c5d 100755 --- a/bin/cloudlet +++ b/bin/cloudlet @@ -128,7 +128,7 @@ def main(argv): CMD_BASE_CREATION = "base" CMD_OVERLAY_CREATION = "overlay" - CMD_SYNTEHSIS = "synthesis" + CMD_SYNTHESIS = "synthesis" CMD_LIST_BASE = "list-base" CMD_DEL_BASE = "del-base" CMD_ADD_BASE = "add-base" @@ -142,7 +142,7 @@ def main(argv): commands = { CMD_BASE_CREATION: "create new base VM", CMD_OVERLAY_CREATION: "create new overlay VM on top of base VM", - CMD_SYNTEHSIS: "test created overlay using command line", + CMD_SYNTHESIS: "test created overlay using command line", CMD_LIST_BASE: "show all base VM at this machine", CMD_ADD_BASE: "add existing base vm to DB", CMD_DEL_BASE: "delete base vm at database", @@ -225,7 +225,7 @@ def main(argv): "%s\nFailed to create overlay" % str( traceback.format_exc())) return 1 - elif mode == CMD_SYNTEHSIS: + elif mode == CMD_SYNTHESIS: if len(left_args) < 2: sys.stderr.write("\nSynthesis requires path to VM disk and overlay-meta\n \ Ex) ./cloudlet synthesis /path/to/VM_disk /path/to/precise.overlay-meta [options]\n") diff --git a/bin/handoff-server-proc b/bin/handoff-server-proc index 62f72998..487a2499 100755 --- a/bin/handoff-server-proc +++ b/bin/handoff-server-proc @@ -19,425 +19,32 @@ # import os -import functools import traceback import sys import time import struct -import Queue import SocketServer import socket -import subprocess - -import tempfile import multiprocessing -import threading from optparse import OptionParser -from hashlib import sha256 if os.path.exists("../elijah") is True: sys.path.insert(0, "../") from elijah.provisioning.synthesis import validate_congifuration -from elijah.provisioning import delta -from elijah.provisioning.delta import DeltaItem from elijah.provisioning.server import NetworkUtil from elijah.provisioning.synthesis_protocol import Protocol as Protocol -from elijah.provisioning.synthesis import run_fuse -from elijah.provisioning.synthesis import SynthesizedVM -from elijah.provisioning.synthesis import connect_vnc from elijah.provisioning.handoff import HandoffDataRecv - -#import synthesis as synthesis -#from package import VMOverlayPackage -from elijah.provisioning.db.api import DBConnector -from elijah.provisioning.db.table_def import BaseVM +from elijah.provisioning.stream_server import RecoverDeltaProc, StreamSynthesisError from elijah.provisioning.configuration import Const as Cloudlet_Const from elijah.provisioning.compression import DecompProc -from elijah.provisioning import tool from elijah.provisioning import log as logging -import mmap from pprint import pformat LOG = logging.getLogger(__name__) session_resources = dict() # dict[session_id] = obj(SessionResource) - -def wrap_process_fault(function): - """Wraps a method to catch exceptions related to instances. - This decorator wraps a method to catch any exceptions and - terminate the request gracefully. - """ - @functools.wraps(function) - def decorated_function(self, *args, **kwargs): - try: - return function(self, *args, **kwargs) - except Exception as e: - if hasattr(self, 'exception_handler'): - self.exception_handler() - kwargs.update(dict(zip(function.func_code.co_varnames[2:], args))) - LOG.error("failed with : %s" % str(kwargs)) - - return decorated_function - - -def try_except(fn): - def wrapped(*args, **kwargs): - try: - return fn(*args, **kwargs) - except Exception as e: - et, ei, tb = sys.exc_info() - raise MyError, MyError(e), tb - return wrapped - - -class StreamSynthesisError(Exception): - pass - - -class AckThread(threading.Thread): - - def __init__(self, request): - self.request = request - self.ack_queue = Queue.Queue() - threading.Thread.__init__(self, target=self.start_sending_ack) - - def start_sending_ack(self): - while True: - data = self.ack_queue.get() - bytes_recved = self.ack_queue.get() - # send ack - ack_data = struct.pack("!Q", bytes_recved) - self.request.sendall(ack_data) - - def signal_ack(self): - self.ack_queue.put(bytes_recved) - - -class RecoverDeltaProc(multiprocessing.Process): - FUSE_INDEX_DISK = 1 - FUSE_INDEX_MEMORY = 2 - - def __init__(self, base_disk, base_mem, - decomp_delta_queue, output_mem_path, - output_disk_path, chunk_size, - fuse_info_queue): - if base_disk is None and base_mem is None: - raise StreamSynthesisError("Need either base_disk or base_memory") - - self.decomp_delta_queue = decomp_delta_queue - self.output_mem_path = output_mem_path - self.output_disk_path = output_disk_path - self.fuse_info_queue = fuse_info_queue - self.base_disk = base_disk - self.base_mem = base_mem - - self.base_disk_fd = None - self.base_mem_fd = None - self.raw_disk = None - self.raw_mem = None - self.mem_overlay_dict = None - self.raw_mem_overlay = None - self.chunk_size = chunk_size - self.zero_data = struct.pack("!s", chr(0x00)) * chunk_size - self.recovered_delta_dict = dict() - self.recovered_hash_dict = dict() - self.live_migration_iteration_dict = dict() - - multiprocessing.Process.__init__(self, target=self.recover_deltaitem) - - def recover_deltaitem(self): - time_start = time.time() - - # initialize reference data to use mmap - count = 0 - self.base_disk_fd = open(self.base_disk, "rb") - self.raw_disk = mmap.mmap( - self.base_disk_fd.fileno(), - 0, prot=mmap.PROT_READ) - self.base_mem_fd = open(self.base_mem, "rb") - self.raw_mem = mmap.mmap( - self.base_mem_fd.fileno(), - 0, prot=mmap.PROT_READ) - self.recover_mem_fd = open(self.output_mem_path, "wrb") - self.recover_disk_fd = open(self.output_disk_path, "wrb") - - unresolved_deltaitem_list = [] - while True: - recv_data = self.decomp_delta_queue.get() - if recv_data == Cloudlet_Const.QUEUE_SUCCESS_MESSAGE: - break - - overlay_chunk_ids = list() - # recv_data is a single blob so that it contains whole DeltaItem - LOG.debug("%f\trecover one blob" % (time.time())) - delta_item_list = RecoverDeltaProc.from_buffer(recv_data) - for delta_item in delta_item_list: - ret = self.recover_item(delta_item) - if ret is None: - # cannot find self reference point due to the parallel - # compression. Save this and do it later - unresolved_deltaitem_list.append(delta_item) - continue - self.process_deltaitem(delta_item, overlay_chunk_ids) - count += 1 - - self.recover_mem_fd.flush() - self.recover_disk_fd.flush() - - LOG.info( - "[Delta] Handle dangling DeltaItem (%d)" % - len(unresolved_deltaitem_list)) - overlay_chunk_ids = list() - for delta_item in unresolved_deltaitem_list: - ret = self.recover_item(delta_item) - if ret is None: - msg = "Cannot find self reference: type(%ld), offset(%ld), index(%ld)" % ( - delta_item.delta_type, delta_item.offset, delta_item.index) - raise StreamSynthesisError(msg) - self.process_deltaitem(delta_item, overlay_chunk_ids) - count += 1 - - self.recover_mem_fd.close() - self.recover_mem_fd = None - self.recover_disk_fd.close() - self.recover_disk_fd = None - time_end = time.time() - - LOG.info("[time] Delta delta %ld chunks, (%s~%s): %s" % - (count, time_start, time_end, (time_end-time_start))) - LOG.info("Finish VM handoff") - - def recover_item(self, delta_item): - if not isinstance(delta_item, DeltaItem): - raise StreamSynthesisError("Need list of DeltaItem") - - if (delta_item.ref_id == DeltaItem.REF_RAW): - recover_data = delta_item.data - elif (delta_item.ref_id == DeltaItem.REF_ZEROS): - recover_data = self.zero_data - elif (delta_item.ref_id == DeltaItem.REF_BASE_MEM): - offset = delta_item.data - recover_data = self.raw_mem[offset:offset+self.chunk_size] - elif (delta_item.ref_id == DeltaItem.REF_BASE_DISK): - offset = delta_item.data - recover_data = self.raw_disk[offset:offset+self.chunk_size] - elif delta_item.ref_id == DeltaItem.REF_SELF: - ref_index = delta_item.data - self_ref_delta_item = self.recovered_delta_dict.get( - ref_index, None) - if self_ref_delta_item is None: - return None - recover_data = self_ref_delta_item.data - elif delta_item.ref_id == DeltaItem.REF_SELF_HASH: - ref_hashvalue = delta_item.data - self_ref_delta_item = self.recovered_hash_dict.get( - ref_hashvalue, None) - if self_ref_delta_item is None: - return None - recover_data = self_ref_delta_item.data - delta_item.hash_value = ref_hashvalue - elif delta_item.ref_id == DeltaItem.REF_XDELTA: - patch_data = delta_item.data - patch_original_size = delta_item.offset_len - if delta_item.delta_type == DeltaItem.DELTA_MEMORY or\ - delta_item.delta_type == DeltaItem.DELTA_MEMORY_LIVE: - base_data = self.raw_mem[ - delta_item.offset:delta_item.offset + patch_original_size] - elif delta_item.delta_type == DeltaItem.DELTA_DISK or\ - delta_item.delta_type == DeltaItem.DELTA_DISK_LIVE: - base_data = self.raw_disk[ - delta_item.offset:delta_item.offset + patch_original_size] - else: - raise StreamSynthesisError( - "Delta type should be either disk or memory") - recover_data = tool.merge_data( - base_data, patch_data, - len(base_data) * 5) - elif delta_item.ref_id == DeltaItem.REF_BSDIFF: - patch_data = delta_item.data - patch_original_size = delta_item.offset_len - if delta_item.delta_type == DeltaItem.DELTA_MEMORY or\ - delta_item.delta_type == DeltaItem.DELTA_MEMORY_LIVE: - base_data = self.raw_mem[ - delta_item.offset:delta_item.offset + patch_original_size] - elif delta_item.delta_type == DeltaItem.DELTA_DISK or\ - delta_item.delta_type == DeltaItem.DELTA_DISK_LIVE: - base_data = self.raw_disk[ - delta_item.offset:delta_item.offset + patch_original_size] - else: - raise DeltaError("Delta type should be either disk or memory") - recover_data = tool.merge_data_bsdiff(base_data, patch_data) - elif delta_item.ref_id == DeltaItem.REF_XOR: - patch_data = delta_item.data - patch_original_size = delta_item.offset_len - if delta_item.delta_type == DeltaItem.DELTA_MEMORY or\ - delta_item.delta_type == DeltaItem.DELTA_MEMORY_LIVE: - base_data = self.raw_mem[ - delta_item.offset:delta_item.offset + patch_original_size] - elif delta_item.delta_type == DeltaItem.DELTA_DISK or\ - delta_item.delta_type == DeltaItem.DELTA_DISK_LIVE: - base_data = self.raw_disk[ - delta_item.offset:delta_item.offset + patch_original_size] - else: - raise DeltaError("Delta type should be either disk or memory") - recover_data = tool.cython_xor(base_data, patch_data) - else: - raise StreamSynthesisError( - "Cannot recover: invalid referce id %d" % - delta_item.ref_id) - - if len(recover_data) != delta_item.offset_len: - msg = "Error, Recovered Size Error: %d, %d, ref_id: %s, data_len: %ld, offset: %ld, offset_len: %ld" % \ - (delta_item.delta_type, len(recover_data), delta_item.ref_id, - delta_item.data_len, delta_item.offset, delta_item.offset_len) - print msg - raise StreamSynthesisError(msg) - - # recover - delta_item.ref_id = DeltaItem.REF_RAW - delta_item.data = recover_data - if delta_item.hash_value is None or len(delta_item.hash_value) == 0: - delta_item.hash_value = sha256(recover_data).digest() - - return delta_item - - @staticmethod - def from_buffer(data): - #import yappi - # yappi.start() - cur_offset = 0 - deltaitem_list = list() - while True: - new_item, offset = RecoverDeltaProc.unpack_stream( - data[cur_offset:]) - cur_offset += offset - if len(data) < cur_offset: - break - deltaitem_list.append(new_item) - # yappi.get_func_stats().print_all() - return deltaitem_list - - @staticmethod - def unpack_stream(stream, with_hashvalue=False): - if len(stream) == 0: - return None, 999999 - offset = 0 - live_seq = None - data = stream[0:8+2+1] - data_len = 0 - offset += (8+2+1) - (ram_offset, offset_len, ref_info) = struct.unpack("!QHc", data) - ref_id = ord(ref_info) & 0xF0 - delta_type = ord(ref_info) & 0x0F - - if ref_id == DeltaItem.REF_RAW or \ - ref_id == DeltaItem.REF_XDELTA or \ - ref_id == DeltaItem.REF_XOR or \ - ref_id == DeltaItem.REF_BSDIFF: - data_len = struct.unpack("!Q", stream[offset:offset+8])[0] - offset += 8 - data = stream[offset:offset+data_len] - offset += data_len - elif ref_id == DeltaItem.REF_SELF: - data = struct.unpack("!Q", stream[offset:offset+8])[0] - offset += 8 - elif ref_id == DeltaItem.REF_BASE_DISK or \ - ref_id == DeltaItem.REF_BASE_MEM: - data = struct.unpack("!Q", stream[offset:offset+8])[0] - offset += 8 - elif ref_id == DeltaItem.REF_SELF_HASH: - # print "unpacking ref_self_hash" - data = struct.unpack("!32s", stream[offset:offset+32])[0] - offset += 32 - - if delta_type == DeltaItem.DELTA_DISK_LIVE or\ - delta_type == DeltaItem.DELTA_MEMORY_LIVE: - live_seq = struct.unpack("!H", stream[offset:offset+2])[0] - offset += 2 - - # hash_value typically does not exist when recovered becuase we don't - # need it - if with_hashvalue: - # hash_value is only needed for residue case - hash_value = struct.unpack("!32s", stream[offset:offset+32])[0] - offset += 32 - item = DeltaItem(delta_type, ram_offset, offset_len, - hash_value, ref_id, data_len, - live_seq=live_seq) - else: - item = DeltaItem(delta_type, ram_offset, offset_len, - None, ref_id, data_len, data, - live_seq=live_seq) - return item, offset - - def process_deltaitem(self, delta_item, overlay_chunk_ids): - if len(delta_item.data) != delta_item.offset_len: - msg = "recovered size is not same as page size, %ld != %ld" % \ - (len(delta_item.data), delta_item.offset_len) - raise StreamSynthesisError(msg) - - # save it to dictionary to find self_reference easily - self.recovered_delta_dict[delta_item.index] = delta_item - self.recovered_hash_dict[delta_item.hash_value] = delta_item - - # do nothing if the latest memory or disk are already process - prev_iter_item = self.live_migration_iteration_dict.get( - delta_item.index) - if (prev_iter_item is not None): - prev_seq = getattr(prev_iter_item, 'live_seq', 0) - item_seq = getattr(delta_item, 'live_seq', 0) - if prev_seq > item_seq: - msg = "Latest version is already synthesized at %d (%d)" % ( - delta_item.offset, delta_item.delta_type) - LOG.debug(msg) - return - - # write to output file - overlay_chunk_id = long(delta_item.offset/self.chunk_size) - if delta_item.delta_type == DeltaItem.DELTA_MEMORY or\ - delta_item.delta_type == DeltaItem.DELTA_MEMORY_LIVE: - self.recover_mem_fd.seek(delta_item.offset) - self.recover_mem_fd.write(delta_item.data) - overlay_chunk_ids.append( - "%d:%ld" % - (RecoverDeltaProc.FUSE_INDEX_MEMORY, overlay_chunk_id)) - elif delta_item.delta_type == DeltaItem.DELTA_DISK or\ - delta_item.delta_type == DeltaItem.DELTA_DISK_LIVE: - self.recover_disk_fd.seek(delta_item.offset) - self.recover_disk_fd.write(delta_item.data) - overlay_chunk_ids.append( - "%d:%ld" % - (RecoverDeltaProc.FUSE_INDEX_DISK, overlay_chunk_id)) - - # update the latest item for each memory page or disk block - self.live_migration_iteration_dict[delta_item.index] = delta_item - - def finish(self): - self.recovered_delta_dict.clear() - self.recovered_delta_dict = None - self.recovered_hash_dict.clear() - self.recovered_hash_dict = None - self.live_migration_iteration_dict.clear() - self.live_migration_iteration_dict = None - if self.base_disk_fd is not None: - self.base_disk_fd.close() - self.base_disk_fd = None - if self.base_mem_fd is not None: - self.base_mem_fd.close() - self.base_mem_fd = None - if self.raw_disk is not None: - self.raw_disk.close() - self.raw_disk = None - if self.raw_mem is not None: - self.raw_mem.close() - self.raw_mem = None - if self.raw_mem_overlay is not None: - self.raw_mem_overlay.close() - self.raw_mem_overlay = None - - class StreamSynthesisHandler(SocketServer.StreamRequestHandler): synthesis_option = { Protocol.SYNTHESIS_OPTION_DISPLAY_VNC: False, @@ -448,7 +55,7 @@ class StreamSynthesisHandler(SocketServer.StreamRequestHandler): def ret_fail(self, message): LOG.error("%s" % str(message)) message = NetworkUtil.encoding({ - Protocol.KEY_COMMAND: Protocol.MESSAGE_COMMAND_FAIELD, + Protocol.KEY_COMMAND: Protocol.MESSAGE_COMMAND_FAILED, Protocol.KEY_FAILED_REASON: message }) message_size = struct.pack("!I", len(message)) @@ -576,7 +183,7 @@ class StreamSynthesisHandler(SocketServer.StreamRequestHandler): if data is None or len(data) != 4: msg = "Failed to receive first byte of header" raise StreamSynthesisError(msg) - break + blob_header_size = struct.unpack("!I", data)[0] blob_header_raw = self._recv_all(blob_header_size) blob_header = NetworkUtil.decoding(blob_header_raw) @@ -670,7 +277,7 @@ class StreamSynthesisServer(SocketServer.TCPServer): self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) LOG.info("* Server configuration") LOG.info(" - Open TCP Server at %s" % (str(server_address))) - LOG.info(" - Time out for waiting: %d" % (self.timeout)) + LOG.info(" - Time out for waiting: %d" % self.timeout) LOG.info(" - Disable Nagle(No TCP delay) : %s" % str(self.socket.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY))) LOG.info("-"*50) @@ -687,7 +294,6 @@ class StreamSynthesisServer(SocketServer.TCPServer): SocketServer.TCPServer.handle_error(self, request, client_address) LOG.error("handling error from client %s\n" % (str(client_address))) LOG.error(traceback.format_exc()) - LOG.error("%s" % str(e)) def handle_timeout(self): LOG.error("timeout error\n") @@ -735,13 +341,6 @@ class StreamSynthesisServer(SocketServer.TCPServer): return ret_list -def sigint_handler(signum, frame): - sys.stdout.write("Exit by user\n") - if server is not None: - server.terminate() - sys.exit(0) - - def main(argv=sys.argv): if not validate_congifuration(): sys.stderr.write("failed to validate configuration\n") diff --git a/elijah/provisioning/compression.py b/elijah/provisioning/compression.py index 4231b96a..ea4e2f1d 100644 --- a/elijah/provisioning/compression.py +++ b/elijah/provisioning/compression.py @@ -510,7 +510,7 @@ def _decomp(self): def decomp_overlay(meta, output_path): meta_dict = msgpack.unpackb(open(meta, "r").read()) - decomp_start_time = time() + decomp_start_time = time.time() comp_overlay_files = meta_dict[Const.META_OVERLAY_FILES] comp_overlay_files = [item [Const.META_OVERLAY_FILE_NAME] @@ -527,7 +527,7 @@ def decomp_overlay(meta, output_path): overlay_file.write(decomp_data) sys.stdout.write( "Overlay decomp time for %d files: %f at %s\n" % - (len(comp_overlay_files), (time()-decomp_start_time), output_path)) + (len(comp_overlay_files), (time.time()-decomp_start_time), output_path)) overlay_file.close() return meta_dict diff --git a/elijah/provisioning/configuration.py b/elijah/provisioning/configuration.py index a81658e1..2f08bdc4 100644 --- a/elijah/provisioning/configuration.py +++ b/elijah/provisioning/configuration.py @@ -44,7 +44,7 @@ def is_exe(fpath): class Const(object): - VERSION = str("0.9.3") + VERSION = str("0.9.4") CLOUDLET_KVM_RELEASE = "https://github.com/cmusatyalab/elijah-qemu/releases" HOME_DIR = os.path.abspath(os.path.expanduser("~")) CONFIGURATION_DIR = os.path.join('/', 'var', 'lib', 'cloudlet', 'conf') diff --git a/elijah/provisioning/server.py b/elijah/provisioning/server.py index 234ef0f4..14beecfd 100644 --- a/elijah/provisioning/server.py +++ b/elijah/provisioning/server.py @@ -413,7 +413,7 @@ class SynthesisHandler(SocketServer.StreamRequestHandler): def ret_fail(self, message): LOG.error("%s" % str(message)) message = NetworkUtil.encoding({ - Protocol.KEY_COMMAND : Protocol.MESSAGE_COMMAND_FAIELD, + Protocol.KEY_COMMAND : Protocol.MESSAGE_COMMAND_FAILED, Protocol.KEY_FAILED_REASON : message }) message_size = struct.pack("!I", len(message)) diff --git a/elijah/provisioning/stream_server.py b/elijah/provisioning/stream_server.py index 48a44dda..9ce2e6df 100644 --- a/elijah/provisioning/stream_server.py +++ b/elijah/provisioning/stream_server.py @@ -19,22 +19,21 @@ # import os -import functools import traceback import sys import time import struct -import Queue import SocketServer import socket -import subprocess +import signal import collections import tempfile import multiprocessing import threading from hashlib import sha256 -import delta +import shutil + from server import NetworkUtil from synthesis_protocol import Protocol as Protocol from synthesis import run_fuse @@ -42,8 +41,6 @@ from synthesis import connect_vnc from handoff import HandoffDataRecv -#import synthesis as synthesis -#from package import VMOverlayPackage from db.api import DBConnector from db.table_def import BaseVM from configuration import Const as Cloudlet_Const @@ -55,61 +52,12 @@ import tool from delta import DeltaItem - LOG = logging.getLogger(__name__) session_resources = dict() # dict[session_id] = obj(SessionResource) - -def wrap_process_fault(function): - """Wraps a method to catch exceptions related to instances. - This decorator wraps a method to catch any exceptions and - terminate the request gracefully. - """ - @functools.wraps(function) - def decorated_function(self, *args, **kwargs): - try: - return function(self, *args, **kwargs) - except Exception, e: - if hasattr(self, 'exception_handler'): - self.exception_handler() - kwargs.update(dict(zip(function.func_code.co_varnames[2:], args))) - LOG.error("failed with : %s" % str(kwargs)) - - return decorated_function - - -def try_except(fn): - def wrapped(*args, **kwargs): - try: - return fn(*args, **kwargs) - except Exception, e: - et, ei, tb = sys.exc_info() - raise MyError, MyError(e), tb - return wrapped - - class StreamSynthesisError(Exception): pass - -class AckThread(threading.Thread): - def __init__(self, request): - self.request = request - self.ack_queue = Queue.Queue() - threading.Thread.__init__(self, target=self.start_sending_ack) - - def start_sending_ack(self): - while True: - data = self.ack_queue.get() - bytes_recved= self.ack_queue.get() - # send ack - ack_data = struct.pack("!Q", bytes_recved) - self.request.sendall(ack_data) - - def signal_ack(self): - self.ack_queue.put(bytes_recved) - - class RecoverDeltaProc(multiprocessing.Process): FUSE_INDEX_DISK = 1 FUSE_INDEX_MEMORY = 2 @@ -262,7 +210,7 @@ def recover_item(self, delta_item, delta_counter, delta_times): delta_item.delta_type == DeltaItem.DELTA_DISK_LIVE: base_data = self.raw_disk[delta_item.offset:delta_item.offset+patch_original_size] else: - raise DeltaError("Delta type should be either disk or memory") + raise StreamSynthesisError("Delta type should be either disk or memory") recover_data = tool.merge_data_bsdiff(base_data, patch_data) elif delta_item.ref_id == DeltaItem.REF_XOR: patch_data = delta_item.data @@ -274,7 +222,7 @@ def recover_item(self, delta_item, delta_counter, delta_times): delta_item.delta_type == DeltaItem.DELTA_DISK_LIVE: base_data = self.raw_disk[delta_item.offset:delta_item.offset+patch_original_size] else: - raise DeltaError("Delta type should be either disk or memory") + raise StreamSynthesisError("Delta type should be either disk or memory") recover_data = tool.cython_xor(base_data, patch_data) else: raise StreamSynthesisError("Cannot recover: invalid referce id %d" % delta_item.ref_id) @@ -299,8 +247,6 @@ def recover_item(self, delta_item, delta_counter, delta_times): @staticmethod def from_buffer(data, delta_counter, delta_times): - #import yappi - #yappi.start() offset = 0 deltaitem_list = list() while True: @@ -310,7 +256,6 @@ def from_buffer(data, delta_counter, delta_times): break delta_times['unpack'] += (time.time() - start_time) deltaitem_list.append(new_item) - #yappi.get_func_stats().print_all() return deltaitem_list @staticmethod @@ -453,9 +398,8 @@ def feeding_thread(self): def terminate(self): self.stop.set() - def terminate(self): - self.stop.set() - +def handlesig(signum, frame): + LOG.info("Received signal(%d) to terminate VM..." % signum) class StreamSynthesisHandler(SocketServer.StreamRequestHandler): synthesis_option = { @@ -467,7 +411,7 @@ class StreamSynthesisHandler(SocketServer.StreamRequestHandler): def ret_fail(self, message): LOG.error("%s" % str(message)) message = NetworkUtil.encoding({ - Protocol.KEY_COMMAND : Protocol.MESSAGE_COMMAND_FAIELD, + Protocol.KEY_COMMAND : Protocol.MESSAGE_COMMAND_FAILED, Protocol.KEY_FAILED_REASON : message }) message_size = struct.pack("!I", len(message)) @@ -600,7 +544,7 @@ def handle(self): data = self._recv_all(4) if data == None or len(data) != 4: raise StreamSynthesisError("Failed to receive first byte of header") - break + blob_header_size = struct.unpack("!I", data)[0] blob_header_raw = self._recv_all(blob_header_size) blob_header = NetworkUtil.decoding(blob_header_raw) @@ -687,7 +631,11 @@ def handle(self): LOG.info("finished") if self.server.handoff_data == None: - connect_vnc(synthesized_vm.machine) + connect_vnc(synthesized_vm.machine, True) + + signal.signal(signal.SIGUSR1, handlesig) + signal.pause() + synthesized_vm.monitor.terminate() synthesized_vm.monitor.join() synthesized_vm.terminate() @@ -771,7 +719,6 @@ def handle_error(self, request, client_address): SocketServer.TCPServer.handle_error(self, request, client_address) sys.stderr.write("handling error from client %s\n" % (str(client_address))) sys.stderr.write(traceback.format_exc()) - sys.stderr.write("%s" % str(e)) def handle_timeout(self): sys.stderr.write("timeout error\n") diff --git a/elijah/provisioning/synthesis_protocol.py b/elijah/provisioning/synthesis_protocol.py index 96ac92a0..cb82e42f 100644 --- a/elijah/provisioning/synthesis_protocol.py +++ b/elijah/provisioning/synthesis_protocol.py @@ -35,7 +35,7 @@ class Protocol(object): MESSAGE_COMMAND_SESSION_CLOSE = 0x16 # server -> client as return MESSAGE_COMMAND_SUCCESS = 0x01 - MESSAGE_COMMAND_FAIELD = 0x02 + MESSAGE_COMMAND_FAILED = 0x02 # server -> client as command MESSAGE_COMMAND_ON_DEMAND = 0x03 MESSAGE_COMMAND_SYNTHESIS_DONE = 0x04