diff --git a/characters.py b/characters.py index a7c9200..45b89de 100644 --- a/characters.py +++ b/characters.py @@ -5,29 +5,31 @@ DIR = "wasd" DEFAULT_LIVES = 3 + def vector2dir(vx, vy): m = max(abs(vx), abs(vy)) if m == abs(vx): if vx < 0: - d = 1 #a + d = 1 # a else: - d = 3 #d + d = 3 # d else: if vy > 0: - d = 2 #s + d = 2 # s else: - d = 0 #w + d = 0 # w return d + class Character: def __init__(self, x=1, y=1): self._pos = x, y self._spawn_pos = self._pos - + @property def pos(self): return self._pos - + @pos.setter def pos(self, value): self._pos = value @@ -49,12 +51,11 @@ def __init__(self, pos, lives=DEFAULT_LIVES): super().__init__(x=pos[0], y=pos[1]) self._lives = lives self._powers = [] - + def to_dict(self): - return {"pos": self.pos, - "lives": self._lives} + return {"pos": self.pos, "lives": self._lives} - @property + @property def powers(self): return self._powers @@ -66,7 +67,7 @@ def flames(self): return len([p for p in self._powers if p == Powerups.Flames]) def kill(self): - self._lives-=1 + self._lives -= 1 def powerup(self, _type): self._powers.append(_type) @@ -88,10 +89,10 @@ def __init__(self, pos, name, points, speed, smart, wallpass): self.wander = 0 super().__init__(*pos) - + def __str__(self): return f"{self._name}" - + def points(self): return self._points @@ -100,10 +101,12 @@ def move(self, mapa, bomberman, bombs): return if self._smart == Smart.LOW: - new_pos = mapa.calc_pos(self.pos, self.dir[self.lastdir]) #don't bump into stones/walls + new_pos = mapa.calc_pos( + self.pos, self.dir[self.lastdir] + ) # don't bump into stones/walls if new_pos == self.pos: self.lastdir = (self.lastdir + 1) % len(self.dir) - + elif self._smart == Smart.NORMAL: b_x, b_y = bomberman.pos o_x, o_y = self.pos @@ -113,7 +116,7 @@ def move(self, mapa, bomberman, bombs): else: direction = vector2dir(b_x - o_x, b_y - o_y) - new_pos = mapa.calc_pos(self.pos, self.dir[direction]) #chase bomberman + new_pos = mapa.calc_pos(self.pos, self.dir[direction]) # chase bomberman if new_pos == self.pos: self.lastdir = (direction + random.choice([-1, 1])) % len(self.dir) self.wander = 3 @@ -122,9 +125,9 @@ def move(self, mapa, bomberman, bombs): self.wander -= 1 else: self.lastdir = None - + elif self._smart == Smart.HIGH: - #TODO + # TODO pass self.pos = new_pos @@ -135,27 +138,31 @@ def ready(self): self.step = 0 return True return False - + + class Balloom(Enemy): def __init__(self, pos): - super().__init__(pos, self.__class__.__name__, - 100, Speed.SLOW, Smart.LOW, False) + super().__init__( + pos, self.__class__.__name__, 100, Speed.SLOW, Smart.LOW, False + ) class Oneal(Enemy): def __init__(self, pos): - super().__init__(pos, self.__class__.__name__, - 200, Speed.SLOWEST, Smart.NORMAL, False) + super().__init__( + pos, self.__class__.__name__, 200, Speed.SLOWEST, Smart.NORMAL, False + ) class Doll(Enemy): def __init__(self, pos): - super().__init__(pos, self.__class__.__name__, - 400, Speed.NORMAL, Smart.LOW, False) + super().__init__( + pos, self.__class__.__name__, 400, Speed.NORMAL, Smart.LOW, False + ) class Minvo(Enemy): def __init__(self, pos): - super().__init__(pos, self.__class__.__name__, - 800, Speed.FAST, Smart.NORMAL, False) - + super().__init__( + pos, self.__class__.__name__, 800, Speed.FAST, Smart.NORMAL, False + ) diff --git a/client.py b/client.py index 4a80e9c..c4d2dd0 100644 --- a/client.py +++ b/client.py @@ -7,57 +7,63 @@ from mapa import Map -#Next 2 lines are not needed for AI agent +# Next 2 lines are not needed for AI agent import pygame + pygame.init() -async def agent_loop(server_address = "localhost:8000", agent_name="student"): + +async def agent_loop(server_address="localhost:8000", agent_name="student"): async with websockets.connect(f"ws://{server_address}/player") as websocket: - # Receive information about static game properties + # Receive information about static game properties await websocket.send(json.dumps({"cmd": "join", "name": agent_name})) msg = await websocket.recv() - game_properties = json.loads(msg) - + game_properties = json.loads(msg) + # You can create your own map representation or use the game representation: - mapa = Map(size = game_properties['size'], mapa = game_properties['map']) + mapa = Map(size=game_properties["size"], mapa=game_properties["map"]) - #Next 3 lines are not needed for AI agent + # Next 3 lines are not needed for AI agent SCREEN = pygame.display.set_mode((299, 123)) SPRITES = pygame.image.load("data/pad.png").convert_alpha() SCREEN.blit(SPRITES, (0, 0)) while True: try: - state = json.loads(await websocket.recv()) #receive game state, this must be called timely or your game will get out of sync with the server + state = json.loads( + await websocket.recv() + ) # receive game state, this must be called timely or your game will get out of sync with the server - #Next lines are only for the Human Agent, the key values are nonetheless the correct ones! + # Next lines are only for the Human Agent, the key values are nonetheless the correct ones! key = "" for event in pygame.event.get(): - if event.type == pygame.QUIT or not state['lives']: + if event.type == pygame.QUIT or not state["lives"]: pygame.quit() if event.type == pygame.KEYDOWN: if event.key == pygame.K_UP: - key = 'w' + key = "w" elif event.key == pygame.K_LEFT: - key = 'a' + key = "a" elif event.key == pygame.K_DOWN: - key = 's' + key = "s" elif event.key == pygame.K_RIGHT: - key = 'd' + key = "d" elif event.key == pygame.K_a: - key = 'A' + key = "A" elif event.key == pygame.K_b: - key = 'B' + key = "B" - await websocket.send(json.dumps({"cmd": "key", "key": key})) #send key command to server - you must implement this send in the AI agent + await websocket.send( + json.dumps({"cmd": "key", "key": key}) + ) # send key command to server - you must implement this send in the AI agent break except websockets.exceptions.ConnectionClosedOK: print("Server has cleanly disconnected us") return - #Next line is not needed for AI agent + # Next line is not needed for AI agent pygame.display.flip() @@ -65,7 +71,7 @@ async def agent_loop(server_address = "localhost:8000", agent_name="student"): # You can change the default values using the command line, example: # $ NAME='bombastico' python3 client.py loop = asyncio.get_event_loop() -SERVER = os.environ.get('SERVER', 'localhost') -PORT = os.environ.get('PORT', '8000') -NAME = os.environ.get('NAME', getpass.getuser()) +SERVER = os.environ.get("SERVER", "localhost") +PORT = os.environ.get("PORT", "8000") +NAME = os.environ.get("NAME", getpass.getuser()) loop.run_until_complete(agent_loop(f"{SERVER}:{PORT}", NAME)) diff --git a/game.py b/game.py index f6dfe84..46e0446 100644 --- a/game.py +++ b/game.py @@ -10,35 +10,36 @@ from consts import Powerups from mapa import Map, Tiles -logger = logging.getLogger('Game') +logger = logging.getLogger("Game") logger.setLevel(logging.DEBUG) LIVES = 3 INITIAL_SCORE = 0 -TIMEOUT = 3000 +TIMEOUT = 3000 GAME_SPEED = 10 MIN_BOMB_RADIUS = 3 LEVEL_ENEMIES = { - 1: [Balloom]*6, - 2: [Balloom]*3 + [Oneal]*3, - 3: [Balloom]*2 + [Oneal]*2 + [Doll]*2, - 4: [Balloom] + [Oneal] + [Doll]*2 + [Minvo]*2, - 5: [Oneal]*4 + [Doll]*3, - } + 1: [Balloom] * 6, + 2: [Balloom] * 3 + [Oneal] * 3, + 3: [Balloom] * 2 + [Oneal] * 2 + [Doll] * 2, + 4: [Balloom] + [Oneal] + [Doll] * 2 + [Minvo] * 2, + 5: [Oneal] * 4 + [Doll] * 3, +} LEVEL_POWERUPS = { - 1: Powerups.Flames, - 2: Powerups.Bombs, - 3: Powerups.Detonator, - 4: Powerups.Speed, - 5: Powerups.Bombs + 1: Powerups.Flames, + 2: Powerups.Bombs, + 3: Powerups.Detonator, + 4: Powerups.Speed, + 5: Powerups.Bombs, } + class Bomb: def __init__(self, pos, mapa, radius, detonator=False): self._pos = pos - self._timeout = radius+1 #TODO fine tune + self._timeout = radius + 1 # TODO fine tune self._radius = radius self._detonator = detonator self._map = mapa @@ -61,7 +62,7 @@ def radius(self): def update(self): if not self._detonator: - self._timeout-=1/2 + self._timeout -= 1 / 2 def exploded(self): return not self._timeout > 0 @@ -74,18 +75,18 @@ def in_range(self, character): cx, cy = character if by == cy: - for r in range(self._radius+1): + for r in range(self._radius + 1): if self._map.is_stone((bx + r, by)): - break #protected by stone + break # protected by stone if (cx, cy) == (bx + r, by) or (cx, cy) == (bx - r, by): return True if bx == cx: - for r in range(self._radius+1): + for r in range(self._radius + 1): if self._map.is_stone((bx, by + r)): - break #protected by stone + break # protected by stone if (cx, cy) == (bx, by + r) or (cx, cy) == (bx, by - r): return True - + return False def __repr__(self): @@ -104,16 +105,15 @@ def __init__(self, level=1, lives=LIVES, timeout=TIMEOUT): self.map = Map() self._enemies = [] - - def info(self): - return {"size": self.map.size, - "map": self.map.map, - "fps": GAME_SPEED, - "timeout": TIMEOUT, - "lives": LIVES, - "score": self.score, - } + return { + "size": self.map.size, + "map": self.map.map, + "fps": GAME_SPEED, + "timeout": TIMEOUT, + "lives": LIVES, + "score": self.score, + } @property def running(self): @@ -127,18 +127,17 @@ def start(self, player_name): logger.debug("Reset world") self._player_name = player_name self._running = True - self._score = INITIAL_SCORE + self._score = INITIAL_SCORE self.next_level(self.initial_level) - #TODO REMOVE: + # TODO REMOVE: self._bomberman.powers.append(Powerups.Detonator) self._bomberman.powers.append(Powerups.Bombs) - def stop(self): logger.info("GAME OVER") self._running = False - + def next_level(self, level): if level > len(LEVEL_ENEMIES): logger.info("You WIN!") @@ -153,9 +152,11 @@ def next_level(self, level): self._powerups = [] self._bonus = [] self._exit = [] - self._lastkeypress = "" + self._lastkeypress = "" self._bomb_radius = 3 - self._enemies = [t(p) for t, p in zip(LEVEL_ENEMIES[level], self.map.enemies_spawn)] + self._enemies = [ + t(p) for t, p in zip(LEVEL_ENEMIES[level], self.map.enemies_spawn) + ] def quit(self): logger.debug("Quit") @@ -167,35 +168,45 @@ def keypress(self, key): def update_bomberman(self): try: if self._lastkeypress.isupper(): - #Parse action - if self._lastkeypress == 'A' and len(self._bombs) > 0: - self._bombs[0].detonate() #always detonate the oldest bomb - elif self._lastkeypress == 'B' and len(self._bombs) < self._bomberman.powers.count(Powerups.Bombs) + 1: - self._bombs.append(Bomb(self._bomberman.pos, - self.map, - MIN_BOMB_RADIUS+self._bomberman.flames(), - detonator= Powerups.Detonator in self._bomberman.powers - ) - ) # must be dependent of powerup + # Parse action + if self._lastkeypress == "A" and len(self._bombs) > 0: + self._bombs[0].detonate() # always detonate the oldest bomb + elif ( + self._lastkeypress == "B" + and len(self._bombs) + < self._bomberman.powers.count(Powerups.Bombs) + 1 + ): + self._bombs.append( + Bomb( + self._bomberman.pos, + self.map, + MIN_BOMB_RADIUS + self._bomberman.flames(), + detonator=Powerups.Detonator in self._bomberman.powers, + ) + ) # must be dependent of powerup else: - #Update position - new_pos = self.map.calc_pos(self._bomberman.pos, self._lastkeypress) #don't bump into stones/walls - if new_pos not in [b.pos for b in self._bombs]: #don't pass over bombs + # Update position + new_pos = self.map.calc_pos( + self._bomberman.pos, self._lastkeypress + ) # don't bump into stones/walls + if new_pos not in [b.pos for b in self._bombs]: # don't pass over bombs self._bomberman.pos = new_pos - for pos, _type in self._powerups: #consume powerups + for pos, _type in self._powerups: # consume powerups if new_pos == pos: self._bomberman.powerup(_type) self._powerups.remove((pos, _type)) except AssertionError: - logger.error("Invalid key <%s> pressed. Valid keys: w,a,s,d A B", self._lastkeypress) + logger.error( + "Invalid key <%s> pressed. Valid keys: w,a,s,d A B", self._lastkeypress + ) finally: - self._lastkeypress = "" #remove inertia + self._lastkeypress = "" # remove inertia if len(self._enemies) == 0 and self._bomberman.pos == self._exit: logger.info(f"Level {self.map.level} completed") - self._score += (self._timeout - self._step) - self.next_level(self.map.level+1) + self._score += self._timeout - self._step + self.next_level(self.map.level + 1) def kill_bomberman(self): logger.info(f"bomberman has died on step: {self._step}") @@ -228,7 +239,9 @@ def explode_bomb(self): if self.map.exit_door == wall: self._exit = wall if self.map.powerup == wall: - self._powerups.append((wall, LEVEL_POWERUPS[self.map.level])) + self._powerups.append( + (wall, LEVEL_POWERUPS[self.map.level]) + ) for enemy in self._enemies[:]: if bomb.in_range(enemy): @@ -239,7 +252,7 @@ def explode_bomb(self): self._bombs.remove(bomb) async def next_frame(self): - await asyncio.sleep(1./GAME_SPEED) + await asyncio.sleep(1.0 / GAME_SPEED) if not self._running: logger.info("Waiting for player 1") @@ -250,32 +263,37 @@ async def next_frame(self): self.stop() if self._step % 100 == 0: - logger.debug(f"[{self._step}] SCORE {self._score} - LIVES {self._bomberman.lives}") + logger.debug( + f"[{self._step}] SCORE {self._score} - LIVES {self._bomberman.lives}" + ) - self.explode_bomb() + self.explode_bomb() self.update_bomberman() - if self._step % (self._bomberman.powers.count(Powerups.Speed)+1) == 0: #increase speed of bomberman by moving enemies less often + if ( + self._step % (self._bomberman.powers.count(Powerups.Speed) + 1) == 0 + ): # increase speed of bomberman by moving enemies less often for enemy in self._enemies: enemy.move(self.map, self._bomberman, self._bombs) self.collision() - self._state = {"level": self.map.level, - "step": self._step, - "timeout": self._timeout, - "player": self._player_name, - "score": self._score, - "lives": self._bomberman.lives, - "bomberman": self._bomberman.pos, - "bombs": [(b.pos, b.timeout, b.radius) for b in self._bombs], - "enemies": [{'name': str(e), 'pos': e.pos} for e in self._enemies], - "walls": self.map.walls, - "powerups": [(p, Powerups(n).name) for p, n in self._powerups], - "bonus": self._bonus, - "exit": self._exit, - } + self._state = { + "level": self.map.level, + "step": self._step, + "timeout": self._timeout, + "player": self._player_name, + "score": self._score, + "lives": self._bomberman.lives, + "bomberman": self._bomberman.pos, + "bombs": [(b.pos, b.timeout, b.radius) for b in self._bombs], + "enemies": [{"name": str(e), "pos": e.pos} for e in self._enemies], + "walls": self.map.walls, + "powerups": [(p, Powerups(n).name) for p, n in self._powerups], + "bonus": self._bonus, + "exit": self._exit, + } @property def state(self): - #logger.debug(self._state) + # logger.debug(self._state) return json.dumps(self._state) diff --git a/mapa.py b/mapa.py index b20f1f7..53b6965 100644 --- a/mapa.py +++ b/mapa.py @@ -3,16 +3,19 @@ import random from enum import IntEnum -logger = logging.getLogger('Map') +logger = logging.getLogger("Map") logger.setLevel(logging.DEBUG) + class Tiles(IntEnum): PASSAGE = 0 STONE = 1 WALL = 2 + VITAL_SPACE = 3 + class Map: def __init__(self, level=1, enemies=8, size=(51, 31), mapa=None): self._level = level @@ -20,51 +23,60 @@ def __init__(self, level=1, enemies=8, size=(51, 31), mapa=None): self.hor_tiles = size[0] self.ver_tiles = size[1] self._walls = [] - self._enemies_spawn = [] - + self._enemies_spawn = [] + if not mapa: logger.info("Generating a MAP") self.map = [[Tiles.PASSAGE] * self.ver_tiles for i in range(self.hor_tiles)] for x in range(self.hor_tiles): for y in range(self.ver_tiles): - if x in [0, self.hor_tiles-1] or y in [0, self.ver_tiles-1]: + if x in [0, self.hor_tiles - 1] or y in [0, self.ver_tiles - 1]: + self.map[x][y] = Tiles.STONE + elif x % 2 == 0 and y % 2 == 0: self.map[x][y] = Tiles.STONE - elif x%2 == 0 and y%2 == 0: - self.map[x][y] = Tiles.STONE - elif x >= VITAL_SPACE and y >= VITAL_SPACE: #give bomberman some room - if random.randint(0,100) > 70 + 25/level: + elif ( + x >= VITAL_SPACE and y >= VITAL_SPACE + ): # give bomberman some room + if random.randint(0, 100) > 70 + 25 / level: self.map[x][y] = Tiles.WALL self._walls.append((x, y)) - + for _ in range(enemies): x, y = 0, 0 - while self.map[x][y] in [Tiles.STONE, Tiles.WALL]: #find empty spots to place enemies - x, y = random.randrange(VITAL_SPACE, self.hor_tiles), random.randrange(VITAL_SPACE, self.ver_tiles) + while self.map[x][y] in [ + Tiles.STONE, + Tiles.WALL, + ]: # find empty spots to place enemies + x, y = ( + random.randrange(VITAL_SPACE, self.hor_tiles), + random.randrange(VITAL_SPACE, self.ver_tiles), + ) self._enemies_spawn.append((x, y)) # create a vital space for enemies: - for rx, ry in [(x, y) for x in [-1,0,1] for y in [-1,0,1]]: - if self.map[x+rx][y+ry] in [Tiles.WALL]: - self.map[x+rx][y+ry] = Tiles.PASSAGE - self._walls.remove((x+rx, y+ry)) + for rx, ry in [(x, y) for x in [-1, 0, 1] for y in [-1, 0, 1]]: + if self.map[x + rx][y + ry] in [Tiles.WALL]: + self.map[x + rx][y + ry] = Tiles.PASSAGE + self._walls.remove((x + rx, y + ry)) self.exit_door = random.choice(self._walls) - self.powerup = random.choice([w for w in self._walls if w != self.exit_door]) #hide powerups behind walls only - + self.powerup = random.choice( + [w for w in self._walls if w != self.exit_door] + ) # hide powerups behind walls only else: logger.info("Loading MAP") self.map = mapa for x in range(self.hor_tiles): for y in range(self.ver_tiles): - if self.map[x][y] == Tiles.WALL and (x, y) != (1,1): + if self.map[x][y] == Tiles.WALL and (x, y) != (1, 1): self._walls.append((x, y)) - self._bomberman_spawn = (1,1) #Always true - + self._bomberman_spawn = (1, 1) # Always true + def __getstate__(self): return self.map - + def __setstate__(self, state): - self.map = state + self.map = state @property def size(self): @@ -114,18 +126,17 @@ def calc_pos(self, cur, direction, wallpass=False): cx, cy = cur npos = cur - if direction == 'w': - npos = cx, cy-1 - if direction == 'a': - npos = cx-1, cy - if direction == 's': - npos = cx, cy+1 - if direction == 'd': - npos = cx+1, cy - - #test blocked + if direction == "w": + npos = cx, cy - 1 + if direction == "a": + npos = cx - 1, cy + if direction == "s": + npos = cx, cy + 1 + if direction == "d": + npos = cx + 1, cy + + # test blocked if self.is_blocked(npos, wallpass=wallpass): return cur - + return npos - diff --git a/server.py b/server.py index 755959b..b4e760c 100644 --- a/server.py +++ b/server.py @@ -9,41 +9,46 @@ from collections import namedtuple from game import Game -logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') -wslogger = logging.getLogger('websockets') +logging.basicConfig( + level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +wslogger = logging.getLogger("websockets") wslogger.setLevel(logging.WARN) -logger = logging.getLogger('Server') +logger = logging.getLogger("Server") logger.setLevel(logging.INFO) -Player = namedtuple('Player', ['name', 'ws']) +Player = namedtuple("Player", ["name", "ws"]) MAX_HIGHSCORES = 10 HIGHSCORE_FILE = "highscores.json" + class Game_server: def __init__(self, level, lives, timeout, grading): self.game = Game(level, lives, timeout) self.players = asyncio.Queue() self.viewers = set() - self.current_player = None + self.current_player = None self.grading = grading self.game_info = "{}" - self._highscores = [] + self._highscores = [] if os.path.isfile(HIGHSCORE_FILE): - with open(HIGHSCORE_FILE, 'r') as infile: + with open(HIGHSCORE_FILE, "r") as infile: self._highscores = json.load(infile) - + def save_highscores(self): - #update highscores + # update highscores logger.debug("Save highscores") logger.info("FINAL SCORE <%s>: %s", self.current_player.name, self.game.score) self._highscores.append((self.current_player.name, self.game.score)) - self._highscores = sorted(self._highscores, key=lambda s: -1*s[1])[:MAX_HIGHSCORES] - - with open(HIGHSCORE_FILE, 'w') as outfile: + self._highscores = sorted(self._highscores, key=lambda s: -1 * s[1])[ + :MAX_HIGHSCORES + ] + + with open(HIGHSCORE_FILE, "w") as outfile: json.dump(self._highscores, outfile) async def incomming_handler(self, websocket, path): @@ -54,7 +59,7 @@ async def incomming_handler(self, websocket, path): self.game_info = self.game.info() self.game_info["highscores"] = self._highscores await websocket.send(json.dumps(self.game_info)) - + if path == "/player": logger.info("<%s> has joined", data["name"]) await self.players.put(Player(data["name"], websocket)) @@ -78,53 +83,68 @@ async def mainloop(self): while True: logger.info("Waiting for players") self.current_player = await self.players.get() - + if self.current_player.ws.closed: logger.error(f"<{self.current_player.name}> disconnect while waiting") continue - + try: logger.info(f"Starting game for <{self.current_player.name}>") self.game.start(self.current_player.name) if self.viewers: - await asyncio.wait([client.send(json.dumps(self.game_info)) for client in self.viewers]) + await asyncio.wait( + [ + client.send(json.dumps(self.game_info)) + for client in self.viewers + ] + ) if self.grading: game_rec = dict() - game_rec['player'] = self.current_player.name - + game_rec["player"] = self.current_player.name + while self.game.running: await self.game.next_frame() await self.current_player.ws.send(self.game.state) if self.viewers: - await asyncio.wait([client.send(self.game.state) for client in self.viewers]) + await asyncio.wait( + [client.send(self.game.state) for client in self.viewers] + ) self.save_highscores() - await self.current_player.ws.send(json.dumps({"score": self.game.score})) + await self.current_player.ws.send( + json.dumps({"score": self.game.score}) + ) logger.info(f"Disconnecting <{self.current_player.name}>") except websockets.exceptions.ConnectionClosed: self.current_player = None finally: - try: - if self.grading: - game_rec['score'] = self.game.score - game_rec['level'] = self.game.map.level - requests.post(self.grading, json=game_rec) + try: + if self.grading: + game_rec["score"] = self.game.score + game_rec["level"] = self.game.map.level + requests.post(self.grading, json=game_rec) except: logger.warn("Could not save score to server") if self.current_player: await self.current_player.ws.close() - + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--bind", help="IP address to bind to", default="") parser.add_argument("--port", help="TCP port", type=int, default=8000) parser.add_argument("--level", help="start on level", type=int, default=1) parser.add_argument("--lives", help="Number of lives", type=int, default=3) - parser.add_argument("--timeout", help="Timeout after this amount of steps", type=int, default=3000) - parser.add_argument("--grading-server", help="url of grading server", default='http://bomberman-aulas.5g.cn.atnog.av.it.pt/game') + parser.add_argument( + "--timeout", help="Timeout after this amount of steps", type=int, default=3000 + ) + parser.add_argument( + "--grading-server", + help="url of grading server", + default="http://bomberman-aulas.5g.cn.atnog.av.it.pt/game", + ) args = parser.parse_args() g = Game_server(args.level, args.lives, args.timeout, args.grading_server) @@ -136,4 +156,3 @@ async def mainloop(self): loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(websocket_server, game_loop_task)) loop.close() - diff --git a/viewer.py b/viewer.py index 7425831..16fddd8 100644 --- a/viewer.py +++ b/viewer.py @@ -12,38 +12,92 @@ from mapa import Map, Tiles logging.basicConfig(level=logging.DEBUG) -logger_websockets = logging.getLogger('websockets') +logger_websockets = logging.getLogger("websockets") logger_websockets.setLevel(logging.WARN) -logger = logging.getLogger('Map') +logger = logging.getLogger("Map") logger.setLevel(logging.DEBUG) -BOMBERMAN = {'up': (3*16, 1*16), 'left': (0, 0), 'down': (3*16, 0), 'right': (0, 1*16)} -BALLOOM = {'up': (0, 15*16), 'left': (16, 15*16), 'down': (2*16, 15*16), 'right': (3*16, 15*16)} -ONEAL = {'up': (0, 16*16), 'left': (16, 16*16), 'down': (2*16, 16*16), 'right': (3*16, 16*16)} -DOLL = {'up': (0, 17*16), 'left': (16, 17*16), 'down': (2*16, 17*16), 'right': (3*16, 17*16)} -MINVO = {'up': (0, 18*16), 'left': (16, 18*16), 'down': (2*16, 18*16), 'right': (3*16, 18*16)} -ENEMIES = {'Balloom': BALLOOM, 'Oneal': ONEAL, 'Doll': DOLL, 'Minvo': MINVO} -POWERUPS = {'Bombs': (0, 14*16), 'Flames': (1*16, 14*16)} +BOMBERMAN = { + "up": (3 * 16, 1 * 16), + "left": (0, 0), + "down": (3 * 16, 0), + "right": (0, 1 * 16), +} +BALLOOM = { + "up": (0, 15 * 16), + "left": (16, 15 * 16), + "down": (2 * 16, 15 * 16), + "right": (3 * 16, 15 * 16), +} +ONEAL = { + "up": (0, 16 * 16), + "left": (16, 16 * 16), + "down": (2 * 16, 16 * 16), + "right": (3 * 16, 16 * 16), +} +DOLL = { + "up": (0, 17 * 16), + "left": (16, 17 * 16), + "down": (2 * 16, 17 * 16), + "right": (3 * 16, 17 * 16), +} +MINVO = { + "up": (0, 18 * 16), + "left": (16, 18 * 16), + "down": (2 * 16, 18 * 16), + "right": (3 * 16, 18 * 16), +} +ENEMIES = {"Balloom": BALLOOM, "Oneal": ONEAL, "Doll": DOLL, "Minvo": MINVO} +POWERUPS = {"Bombs": (0, 14 * 16), "Flames": (1 * 16, 14 * 16)} STONE = (48, 48) WALL = (64, 48) PASSAGE = (0, 64) -EXIT = (11*16, 3*16) +EXIT = (11 * 16, 3 * 16) BOMB = [(32, 48), (16, 48), (0, 48)] -EXPLOSION = {'c': (112,96), 'l': (96, 96), 'r': (128, 96), 'u': (112,80), 'd': (112, 112), - 'xl': (80, 96), 'xr': (144, 96), 'xu': (112,64), 'xd': (112, 128) } -FALLOUT = {'c': (32, 96)} +EXPLOSION = { + "c": (112, 96), + "l": (96, 96), + "r": (128, 96), + "u": (112, 80), + "d": (112, 112), + "xl": (80, 96), + "xr": (144, 96), + "xu": (112, 64), + "xd": (112, 128), +} +FALLOUT = {"c": (32, 96)} CHAR_LENGTH = 16 -CHAR_SIZE= CHAR_LENGTH, CHAR_LENGTH -SCALE = 1 - -COLORS = {'white':(255,255,255), 'red':(255,0,0), 'pink':(255,105,180), 'blue':(135,206,235), 'orange':(255,165,0), 'yellow':(255,255,0), 'grey': (120,120,120)} +CHAR_SIZE = CHAR_LENGTH, CHAR_LENGTH +SCALE = 1 + +COLORS = { + "white": (255, 255, 255), + "red": (255, 0, 0), + "pink": (255, 105, 180), + "blue": (135, 206, 235), + "orange": (255, 165, 0), + "yellow": (255, 255, 0), + "grey": (120, 120, 120), +} BACKGROUND = (0, 0, 0) -RANKS = {1:"1ST", 2:"2ND", 3:"3RD", 4:"4TH", 5:"5TH", 6:"6TH", 7:"7TH", 8:"8TH", 9:"9TH", 10:"10TH"} +RANKS = { + 1: "1ST", + 2: "2ND", + 3: "3RD", + 4: "4TH", + 5: "5TH", + 6: "6TH", + 7: "7TH", + 8: "8TH", + 9: "9TH", + 10: "10TH", +} SPRITES = None + async def messages_handler(ws_path, queue): async with websockets.connect(ws_path) as websocket: await websocket.send(json.dumps({"cmd": "join"})) @@ -52,31 +106,33 @@ async def messages_handler(ws_path, queue): r = await websocket.recv() queue.put_nowait(r) + class GameOver(BaseException): pass + class Artifact(pygame.sprite.Sprite): def __init__(self, *args, **kw): - self.x, self.y = None, None #postpone to update_sprite() + self.x, self.y = None, None # postpone to update_sprite() - x, y = (kw.pop("pos", ((kw.pop("x", 0), kw.pop("y", 0))))) + x, y = kw.pop("pos", ((kw.pop("x", 0), kw.pop("y", 0)))) new_pos = scale((x, y)) self.image = pygame.Surface(CHAR_SIZE) self.rect = pygame.Rect(new_pos + CHAR_SIZE) self.update_sprite((x, y)) super().__init__() - def update_sprite(self, pos = None): + def update_sprite(self, pos=None): if not pos: pos = self.x, self.y else: pos = scale(pos) self.rect = pygame.Rect(pos + CHAR_SIZE) - self.image.fill((0,0,230)) + self.image.fill((0, 0, 230)) self.image.blit(*self.sprite) - #self.image = pygame.transform.scale(self.image, scale((1, 1))) + # self.image = pygame.transform.scale(self.image, scale((1, 1))) self.x, self.y = pos - + def update(self, *args): self.update_sprite() @@ -84,7 +140,7 @@ def update(self, *args): class BomberMan(Artifact): def __init__(self, *args, **kw): self.direction = "left" - self.sprite = (SPRITES, (0,0), (*BOMBERMAN[self.direction], *scale((1,1)))) + self.sprite = (SPRITES, (0, 0), (*BOMBERMAN[self.direction], *scale((1, 1)))) super().__init__(*args, **kw) def update(self, new_pos): @@ -99,7 +155,7 @@ def update(self, new_pos): if y < self.y: self.direction = "up" - self.sprite = (SPRITES, (0,0), (*BOMBERMAN[self.direction], *scale((1,1)))) + self.sprite = (SPRITES, (0, 0), (*BOMBERMAN[self.direction], *scale((1, 1)))) self.update_sprite(tuple(new_pos)) @@ -107,9 +163,13 @@ class Enemy(Artifact): def __init__(self, *args, **kw): self.direction = "left" self.name = kw.pop("name") - self.sprite = (SPRITES, (0,0), (*ENEMIES[self.name][self.direction], *scale((1,1)))) + self.sprite = ( + SPRITES, + (0, 0), + (*ENEMIES[self.name][self.direction], *scale((1, 1))), + ) super().__init__(*args, **kw) - + def update(self, new_pos): x, y = scale(new_pos) @@ -122,14 +182,18 @@ def update(self, new_pos): if y < self.y: self.direction = "up" - self.sprite = (SPRITES, (0,0), (*ENEMIES[self.name][self.direction], *scale((1,1)))) + self.sprite = ( + SPRITES, + (0, 0), + (*ENEMIES[self.name][self.direction], *scale((1, 1))), + ) self.update_sprite(new_pos) - + class Bomb(Artifact): def __init__(self, *args, **kw): self.index = 0 - self.sprite = (SPRITES, (0,0), (*BOMB[self.index], *scale((1,1)))) + self.sprite = (SPRITES, (0, 0), (*BOMB[self.index], *scale((1, 1)))) self.exploded = False self.timeout = kw.pop("timeout", -1) self.radius = kw.pop("radius", 0) @@ -138,77 +202,113 @@ def __init__(self, *args, **kw): def update(self, bombs_state): for pos, timeout, radius in bombs_state: if scale(pos) == (self.x, self.y): - #It's me! + # It's me! self.timeout = int(timeout) self.radius = radius self.index = (self.index + 1) % len(BOMB) - self.sprite = (SPRITES, (0,0), (*BOMB[self.index], *scale((1,1)))) + self.sprite = (SPRITES, (0, 0), (*BOMB[self.index], *scale((1, 1)))) self.update_sprite() if self.timeout == 0: self.exploded = True self.sprite = () - self.rect.inflate_ip(self.radius*2*CHAR_LENGTH, self.radius*2*CHAR_LENGTH) - - self.image = pygame.Surface((self.radius*2*CHAR_LENGTH+CHAR_LENGTH, self.radius*2*CHAR_LENGTH+CHAR_LENGTH)) - self.image.blit(SPRITES, scale((self.radius, self.radius)), - (*EXPLOSION['c'], *scale((1,1)))) + self.rect.inflate_ip( + self.radius * 2 * CHAR_LENGTH, self.radius * 2 * CHAR_LENGTH + ) + + self.image = pygame.Surface( + ( + self.radius * 2 * CHAR_LENGTH + CHAR_LENGTH, + self.radius * 2 * CHAR_LENGTH + CHAR_LENGTH, + ) + ) + self.image.blit( + SPRITES, + scale((self.radius, self.radius)), + (*EXPLOSION["c"], *scale((1, 1))), + ) for r in range(1, self.radius): - self.image.blit(SPRITES, scale((self.radius-r, self.radius)), - (*EXPLOSION['l'], *scale((1,1)))) - self.image.blit(SPRITES, scale((self.radius+r, self.radius)), - (*EXPLOSION['r'], *scale((1,1)))) - self.image.blit(SPRITES, scale((self.radius, self.radius-r)), - (*EXPLOSION['u'], *scale((1,1)))) - self.image.blit(SPRITES, scale((self.radius, self.radius+r)), - (*EXPLOSION['d'], *scale((1,1)))) - self.image.blit(SPRITES, scale((0, self.radius)), - (*EXPLOSION['xl'], *scale((1,1)))) - self.image.blit(SPRITES, scale((2*self.radius, self.radius)), - (*EXPLOSION['xr'], *scale((1,1)))) - self.image.blit(SPRITES, scale((self.radius, 0)), - (*EXPLOSION['xu'], *scale((1,1)))) - self.image.blit(SPRITES, scale((self.radius, 2*self.radius)), - (*EXPLOSION['xd'], *scale((1,1)))) - + self.image.blit( + SPRITES, + scale((self.radius - r, self.radius)), + (*EXPLOSION["l"], *scale((1, 1))), + ) + self.image.blit( + SPRITES, + scale((self.radius + r, self.radius)), + (*EXPLOSION["r"], *scale((1, 1))), + ) + self.image.blit( + SPRITES, + scale((self.radius, self.radius - r)), + (*EXPLOSION["u"], *scale((1, 1))), + ) + self.image.blit( + SPRITES, + scale((self.radius, self.radius + r)), + (*EXPLOSION["d"], *scale((1, 1))), + ) + self.image.blit( + SPRITES, scale((0, self.radius)), (*EXPLOSION["xl"], *scale((1, 1))) + ) + self.image.blit( + SPRITES, + scale((2 * self.radius, self.radius)), + (*EXPLOSION["xr"], *scale((1, 1))), + ) + self.image.blit( + SPRITES, scale((self.radius, 0)), (*EXPLOSION["xu"], *scale((1, 1))) + ) + self.image.blit( + SPRITES, + scale((self.radius, 2 * self.radius)), + (*EXPLOSION["xd"], *scale((1, 1))), + ) + class Wall(Artifact): def __init__(self, *args, **kw): - self.sprite = (SPRITES, (0,0), (*WALL, *scale((1,1)))) + self.sprite = (SPRITES, (0, 0), (*WALL, *scale((1, 1)))) super().__init__(*args, **kw) + class Exit(Artifact): def __init__(self, *args, **kw): - self.sprite = (SPRITES, (0, 0), (*EXIT, *scale((1,1)))) + self.sprite = (SPRITES, (0, 0), (*EXIT, *scale((1, 1)))) super().__init__(*args, **kw) + class Powerups(Artifact): def __init__(self, *args, **kw): self.type = kw.pop("name") - self.sprite = (SPRITES, (0,0), (*POWERUPS[self.type], *scale((1,1)))) + self.sprite = (SPRITES, (0, 0), (*POWERUPS[self.type], *scale((1, 1)))) super().__init__(*args, **kw) + def clear_callback(surf, rect): """beneath everything there is a passage.""" surf.blit(SPRITES, (rect.x, rect.y), (*PASSAGE, rect.width, rect.height)) + def scale(pos): x, y = pos return int(x * CHAR_LENGTH / SCALE), int(y * CHAR_LENGTH / SCALE) + def draw_background(mapa): background = pygame.Surface(scale((int(mapa.size[0]), int(mapa.size[1])))) for x in range(int(mapa.size[0])): for y in range(int(mapa.size[1])): wx, wy = scale((x, y)) if mapa.map[x][y] == Tiles.STONE: - background.blit(SPRITES, (wx, wy), (*STONE, *scale((1,1)))) + background.blit(SPRITES, (wx, wy), (*STONE, *scale((1, 1)))) else: - background.blit(SPRITES, (wx, wy), (*PASSAGE, *scale((1,1,)))) + background.blit(SPRITES, (wx, wy), (*PASSAGE, *scale((1, 1)))) return background - -def draw_info(SCREEN, text, pos, color=(0,0,0), background=None): - myfont = pygame.font.Font(None, int(22/SCALE)) + + +def draw_info(SCREEN, text, pos, color=(0, 0, 0), background=None): + myfont = pygame.font.Font(None, int(22 / SCALE)) textsurface = myfont.render(text, True, color, background) x, y = pos @@ -221,15 +321,17 @@ def draw_info(SCREEN, text, pos, color=(0,0,0), background=None): SCREEN.blit(background, pos) else: erase = pygame.Surface(textsurface.get_size()) - erase.fill(COLORS['grey']) - #SCREEN.blit(erase, pos) - + erase.fill(COLORS["grey"]) + # SCREEN.blit(erase, pos) + SCREEN.blit(textsurface, pos) + async def main_loop(q): while True: await main_game() + async def main_game(): global SPRITES, SCREEN @@ -238,17 +340,17 @@ async def main_game(): enemies_group = pygame.sprite.OrderedUpdates() walls_group = pygame.sprite.OrderedUpdates() - logging.info("Waiting for map information from server") - state = await q.get() #first state message includes map information + logging.info("Waiting for map information from server") + state = await q.get() # first state message includes map information logging.debug("Initial game status: %s", state) newgame_json = json.loads(state) GAME_SPEED = newgame_json["fps"] - mapa = Map(size = newgame_json['size'], mapa = newgame_json['map']) - TIMEOUT = newgame_json['timeout'] + mapa = Map(size=newgame_json["size"], mapa=newgame_json["map"]) + TIMEOUT = newgame_json["timeout"] SCREEN = pygame.display.set_mode(scale(mapa.size)) SPRITES = pygame.image.load("data/nes.png").convert_alpha() - + BACKGROUND = draw_background(mapa) SCREEN.blit(BACKGROUND, (0, 0)) main_group.add(BomberMan(pos=mapa.bomberman_spawn)) @@ -258,46 +360,46 @@ async def main_game(): while True: pygame.event.pump() if pygame.key.get_pressed()[pygame.K_ESCAPE]: - asyncio.get_event_loop().stop() - + asyncio.get_event_loop().stop() + main_group.clear(SCREEN, clear_callback) bombs_group.clear(SCREEN, BACKGROUND) enemies_group.clear(SCREEN, clear_callback) - + if "score" in state and "player" in state: text = str(state["score"]) - draw_info(SCREEN, text.zfill(6), (0,0)) + draw_info(SCREEN, text.zfill(6), (0, 0)) text = str(state["player"]).rjust(32) - draw_info(SCREEN, text, (4000,0)) - + draw_info(SCREEN, text, (4000, 0)) + if "bombs" in state: for bomb in bombs_group: if bomb.exploded: bombs_group.remove(bomb) if len(bombs_group.sprites()) < len(state["bombs"]): pos, timeout, radius = state["bombs"][-1] - bombs_group.add(Bomb(pos=pos,timeout=timeout, radius=radius)) - bombs_group.update(state['bombs']) - - if 'enemies' in state: + bombs_group.add(Bomb(pos=pos, timeout=timeout, radius=radius)) + bombs_group.update(state["bombs"]) + + if "enemies" in state: enemies_group.empty() - for enemy in state['enemies']: - enemies_group.add(Enemy(name=enemy['name'], pos=enemy['pos'])) + for enemy in state["enemies"]: + enemies_group.add(Enemy(name=enemy["name"], pos=enemy["pos"])) - if 'walls' in state: + if "walls" in state: walls_group.empty() - for wall in state['walls']: + for wall in state["walls"]: walls_group.add(Wall(pos=wall)) - if 'exit' in state and len(state['exit']): + if "exit" in state and len(state["exit"]): if not [p for p in main_group if isinstance(p, Exit)]: logger.debug("Add Exit") - ex = Exit(pos=state['exit']) + ex = Exit(pos=state["exit"]) main_group.add(ex) main_group.move_to_back(ex) - if 'powerups' in state: - for pos, name in state['powerups']: + if "powerups" in state: + for pos, name in state["powerups"]: if name not in [p.type for p in main_group if isinstance(p, Powerups)]: logger.debug(f"Add {name}") p = Powerups(pos=pos, name=name) @@ -306,7 +408,7 @@ async def main_game(): for powerup in main_group: if isinstance(powerup, Powerups): name = powerup.type - if name not in [p[1] for p in state['powerups']]: + if name not in [p[1] for p in state["powerups"]]: logger.debug(f"Remove {name}") main_group.remove(powerup) @@ -315,39 +417,70 @@ async def main_game(): enemies_group.draw(SCREEN) bombs_group.draw(SCREEN) - #Highscores Board - if ('lives' in state and state['lives'] == 0) or\ - ('step' in state and state['step'] >= TIMEOUT) or\ - ('bomberman' in state and 'exit' in state and state['bomberman'] == state['exit']): + # Highscores Board + if ( + ("lives" in state and state["lives"] == 0) + or ("step" in state and state["step"] >= TIMEOUT) + or ( + "bomberman" in state + and "exit" in state + and state["bomberman"] == state["exit"] + ) + ): highscores = newgame_json["highscores"] - HIGHSCORES = pygame.Surface(scale((20,16))) - HIGHSCORES.fill(COLORS['grey']) + HIGHSCORES = pygame.Surface(scale((20, 16))) + HIGHSCORES.fill(COLORS["grey"]) + + draw_info(HIGHSCORES, "THE 10 BEST PLAYERS", scale((5, 1)), COLORS["white"]) + draw_info(HIGHSCORES, "RANK", scale((2, 3)), COLORS["orange"]) + draw_info(HIGHSCORES, "SCORE", scale((6, 3)), COLORS["orange"]) + draw_info(HIGHSCORES, "NAME", scale((11, 3)), COLORS["orange"]) - draw_info(HIGHSCORES, "THE 10 BEST PLAYERS", scale((5,1)), COLORS['white']) - draw_info(HIGHSCORES, "RANK", scale((2,3)), COLORS['orange']) - draw_info(HIGHSCORES, "SCORE", scale((6,3)), COLORS['orange']) - draw_info(HIGHSCORES, "NAME", scale((11,3)), COLORS['orange']) - for i, highscore in enumerate(highscores): c = (i % 5) + 1 - draw_info(HIGHSCORES, RANKS[i+1], scale((2,i+5)), list(COLORS.values())[c]) - draw_info(HIGHSCORES, str(highscore[1]), scale((6,i+5)), list(COLORS.values())[c]) - draw_info(HIGHSCORES, highscore[0], scale((11,i+5)), list(COLORS.values())[c]) + draw_info( + HIGHSCORES, + RANKS[i + 1], + scale((2, i + 5)), + list(COLORS.values())[c], + ) + draw_info( + HIGHSCORES, + str(highscore[1]), + scale((6, i + 5)), + list(COLORS.values())[c], + ) + draw_info( + HIGHSCORES, + highscore[0], + scale((11, i + 5)), + list(COLORS.values())[c], + ) + + SCREEN.blit( + HIGHSCORES, + ( + (SCREEN.get_width() - HIGHSCORES.get_width()) / 2, + (SCREEN.get_height() - HIGHSCORES.get_height()) / 2, + ), + ) + + if "bomberman" in state: + main_group.update(state["bomberman"]) - SCREEN.blit(HIGHSCORES, ((SCREEN.get_width()-HIGHSCORES.get_width())/2,(SCREEN.get_height()-HIGHSCORES.get_height())/2)) - - if 'bomberman' in state: - main_group.update(state['bomberman']) - pygame.display.flip() - + try: state = json.loads(q.get_nowait()) - if 'step' in state and state['step'] == 1 \ - or 'level' in state and state['level'] != mapa.level: - + if ( + "step" in state + and state["step"] == 1 + or "level" in state + and state["level"] != mapa.level + ): + # New level! lets clean everything up! SCREEN.blit(BACKGROUND, (0, 0)) @@ -356,20 +489,22 @@ async def main_game(): enemies_group.empty() bombs_group.empty() main_group.add(BomberMan(pos=mapa.bomberman_spawn)) - mapa.level = state['level'] - + mapa.level = state["level"] + except asyncio.queues.QueueEmpty: - await asyncio.sleep(1./GAME_SPEED) - continue - + await asyncio.sleep(1.0 / GAME_SPEED) + continue + if __name__ == "__main__": - SERVER = os.environ.get('SERVER', 'localhost') - PORT = os.environ.get('PORT', '8000') - + SERVER = os.environ.get("SERVER", "localhost") + PORT = os.environ.get("PORT", "8000") + parser = argparse.ArgumentParser() parser.add_argument("--server", help="IP address of the server", default=SERVER) - parser.add_argument("--scale", help="reduce size of window by x times", type=int, default=1) + parser.add_argument( + "--scale", help="reduce size of window by x times", type=int, default=1 + ) parser.add_argument("--port", help="TCP port", type=int, default=PORT) args = parser.parse_args() SCALE = args.scale @@ -377,10 +512,12 @@ async def main_game(): LOOP = asyncio.get_event_loop() pygame.font.init() q = asyncio.Queue() - - ws_path = f'ws://{args.server}:{args.port}/viewer' + + ws_path = f"ws://{args.server}:{args.port}/viewer" try: - LOOP.run_until_complete(asyncio.gather(messages_handler(ws_path, q), main_loop(q))) + LOOP.run_until_complete( + asyncio.gather(messages_handler(ws_path, q), main_loop(q)) + ) finally: LOOP.stop()