-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxor.py
129 lines (112 loc) · 3.21 KB
/
xor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# A very simple script for Luby Transform Code
import numpy as np
from random import random
from abc import *
from dataclasses import dataclass
from typing import *
@dataclass
class CodewordBatch:
shift : int
index : np.ndarray
data : np.ndarray
@dataclass
class Codeword:
shift : int
index : np.ndarray
data : np.ndarray
class Decoder(ABC):
@abstractmethod
def put_one(self, code: Codeword):
"""
put codeword into decoder
"""
pass
@abstractmethod
def put_bat(self, code: CodewordBatch):
pass
@abstractmethod
def get(self) -> Optional[bytes]:
"""
get input from decoder
"""
pass
class Encoder(ABC):
@abstractmethod
def put_one(self, data: np.ndarray):
"""
put input into encoder
"""
pass
@abstractmethod
def put_bat(self, data: np.ndarray):
"""
put input into encoder
"""
pass
@abstractmethod
def get_one(self) -> Codeword:
"""
get codeword from encoder
"""
pass
@abstractmethod
def get_bat(self) -> CodewordBatch:
"""
get codeword from encoder
"""
pass
class LTEncoder(Encoder):
"""
Luby Transform Encoder
"""
def __init__(self, dd: np.ndarray, psize: int):
"""
@param(dd): degree distribution array of shape [d]
@param(psize): the input code word size
@field(data): data, array of shape [l]
"""
super().__init__()
self.data = np.zeros((0, psize), dtype=np.uint8)
self.prob = dd.cumsum(0)
self.prob[-1] = 1
def get_one(self) -> Codeword:
"""
@return sample a degree d, and xor d inputs into a codeword
"""
degree = (np.random.random() < self.prob).sum() + 1
index = np.random.randint(0, self.data.shape[0], size=(degree,))
return np.bitwise_xor.reduce(self.data[index])
def get_bat(self, batch: int) -> CodewordBatch:
"""
@return sample multiple degrees [..d], for each [..d], xor d inputs into a codeword
"""
degree = (np.random.random() < self.prob).sum() + 1
index = np.random.randint(0, self.data.shape[0], (degree,))
return np.bitwise_xor.reduce()
def put_one(self, data: np.ndarray):
"""
@param(data) one input packet
"""
assert data.dtype == np.uint8
assert len(data.shape) == 1
assert data.shape[-1] == self.data.shape[-1]
self.data = np.concatenate([self.data, data.reshape(1, -1)], axis=0)
def put_bat(self, data: np.ndarray):
"""
@param(data) a batch of input packets
"""
assert data.dtype == np.uint8
assert len(data.shape) == 2
assert data.shape[-1] == self.data.shape[-1]
self.data = np.concatenate([self.data, data], axis=0)
class LTDecoder(Decoder):
"""
Peeling Decoder
"""
def __init__(self):
self.buff = []
if __name__ == '__main__':
encoder = LTEncoder(np.array([0.5, 0.5]), 1024)
encoder.put_one(np.zeros(1024, dtype=np.uint8))
encoder.put_one(np.ones(1024, dtype=np.uint8))
print(encoder.get_one())