forked from cody2007/alpha_go_zero_implementation
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgnu_go_test.py
109 lines (93 loc) · 3.07 KB
/
gnu_go_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import subprocess as sp
from subprocess import Popen, PIPE
from time import sleep
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK, read
import global_vars as gv
import numpy as np
LEVEL = 1#0
PAUSE = .001
row_nm = 'ABCDEFGHIJKLMNOP'
colors = 'BW'
f = [None]*gv.BATCH_SZ
### start gnugo
for gm in range(gv.BATCH_SZ):
f[gm] = sp.Popen(['gnugo', '--chinese-rules', '--seed', str(gm+1), '--play-out-aftermath', '--capture-all-dead', '--no-ko', '--never-resign', '--mode','gtp','--boardsize',str(gv.n_rows),'--level', str(LEVEL)], stdout=sp.PIPE, stdin=sp.PIPE)
flags = fcntl(f[gm].stdout, F_GETFL) # get current p.stdout flags
fcntl(f[gm].stdout, F_SETFL, flags | O_NONBLOCK)
def read_resp(gm):
sleep(PAUSE)
resp = ' '
while resp[-1] != '\n':
sleep(PAUSE)
try:
resp2 = f[gm].stdout.read()
resp += resp2
except:
continue
return resp[1:]
def req_ok(gm, cmd):
f[gm].stdin.write(cmd)
resp = read_resp(gm)
assert resp[:2] == '= ', 'err reading resp gm %i, cmd %s resp %s' % (gm, cmd, resp)
def req_ok_or_illegal(gm, cmd):
f[gm].stdin.write(cmd)
resp = read_resp(gm)
assert resp[:2] == '= ' or resp.find('? illegal move') != -1
def init_board(board):
for gm in range(gv.BATCH_SZ):
req_ok(gm, 'clear_board\n')
for i in range(gv.n_rows):
for j in range(gv.n_cols):
if board[gm,i,j] == 0:
continue
#req_ok(gm, 'play %s %s%i\n' % (colors[np.int((board[gm,i,j]+1.)/2)], row_nm[j], gv.n_rows - i))
f[gm].stdin.write('play %s %s%i\n' % (colors[np.int((board[gm,i,j]+1.)/2)], row_nm[j], gv.n_rows - i))
def move_nn(to_coords, moving_player=0):
passes = to_coords == -1
to_coords_i = np.array(to_coords)
to_coords_i[passes] = 0
i, j = np.unravel_index(to_coords_i, (gv.n_rows, gv.n_cols))
for gm in range(gv.BATCH_SZ):
#req_ok_or_illegal(gm, 'play %s %s%i\n' % (colors[moving_player], row_nm[j[gm]], gv.n_rows - i[gm]))
if passes[gm]:
cmd = 'play %s pass\n' % colors[moving_player]
else:
cmd = 'play %s %s%i\n' % (colors[moving_player], row_nm[j[gm]], gv.n_rows - i[gm])
f[gm].stdin.write(cmd)
def move_ai(moving_player=1):
ai_to_coords = -np.ones(gv.BATCH_SZ, dtype='int32')
for gm in range(gv.BATCH_SZ):
while True:
try:
f[gm].stdout.read()
break
except:
j = 1
f[gm].stdin.write('genmove %s\n' % colors[moving_player])
for gm in range(gv.BATCH_SZ):
ai_mv_orig = read_resp(gm)
ai_mv = ai_mv_orig.split('\n\n')[-2]
if ai_mv[:2] != '= ':
print 'failed gm %i resp %s' % (gm, ai_mv)
continue
#assert ai_mv[:2] == '= ', 'gm %i resp %s' % (gm, ai_mv)
if ai_mv.find('= PASS') != -1:
#print 'pass ', gm
continue
if ai_mv.find('= resign') != -1:
print 'resign ', gm
#assert False
continue
if len(ai_mv) <= 3:
#assert False, 'gm %i resp %s, orig %s' % (gm, ai_mv, ai_mv_orig)
assert 'gm %i resp %s, orig %s' % (gm, ai_mv, ai_mv_orig)
continue
col = row_nm.find(ai_mv[2])
assert col != -1, 'gm %i resp %s' % (gm, ai_mv)
row = gv.n_rows - np.int(ai_mv[3:])
ai_to_coords[gm] = row*gv.n_cols + col
return ai_to_coords
def show_board(gm):
f[gm].stdin.write('showboard\n')
print read_resp(gm)