139 lines
3.7 KiB
Python
139 lines
3.7 KiB
Python
"""Reader for training log.
|
|
|
|
See lib/Analysis/TrainingLogger.cpp for a description of the format.
|
|
"""
|
|
import ctypes
|
|
import dataclasses
|
|
import io
|
|
import json
|
|
import math
|
|
import sys
|
|
from typing import List, Optional
|
|
|
|
_element_types = {
|
|
'float': ctypes.c_float,
|
|
'double': ctypes.c_double,
|
|
'int8_t': ctypes.c_int8,
|
|
'uint8_t': ctypes.c_uint8,
|
|
'int16_t': ctypes.c_int16,
|
|
'uint16_t': ctypes.c_uint16,
|
|
'int32_t': ctypes.c_int32,
|
|
'uint32_t': ctypes.c_uint32,
|
|
'int64_t': ctypes.c_int64,
|
|
'uint64_t': ctypes.c_uint64
|
|
}
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class TensorSpec:
|
|
name: str
|
|
port: int
|
|
shape: List[int]
|
|
element_type: type
|
|
|
|
@staticmethod
|
|
def from_dict(d: dict):
|
|
name = d['name']
|
|
port = d['port']
|
|
shape = [int(e) for e in d['shape']]
|
|
element_type_str = d['type']
|
|
if element_type_str not in _element_types:
|
|
raise ValueError(f'uknown type: {element_type_str}')
|
|
return TensorSpec(
|
|
name=name,
|
|
port=port,
|
|
shape=shape,
|
|
element_type=_element_types[element_type_str])
|
|
|
|
|
|
class TensorValue:
|
|
|
|
def __init__(self, spec: TensorSpec, buffer: bytes):
|
|
self._spec = spec
|
|
self._buffer = buffer
|
|
self._view = ctypes.cast(self._buffer,
|
|
ctypes.POINTER(self._spec.element_type))
|
|
self._len = math.prod(self._spec.shape)
|
|
|
|
def spec(self) -> TensorSpec:
|
|
return self._spec
|
|
|
|
def __len__(self) -> int:
|
|
return self._len
|
|
|
|
def __getitem__(self, index):
|
|
if index < 0 or index >= self._len:
|
|
raise IndexError(f'Index {index} out of range [0..{self._len})')
|
|
return self._view[index]
|
|
|
|
|
|
def read_tensor(fs: io.BufferedReader, ts: TensorSpec) -> TensorValue:
|
|
size = math.prod(ts.shape) * ctypes.sizeof(ts.element_type)
|
|
data = fs.read(size)
|
|
return TensorValue(ts, data)
|
|
|
|
|
|
def pretty_print_tensor_value(tv: TensorValue):
|
|
print(f'{tv.spec().name}: {",".join([str(v) for v in tv])}')
|
|
|
|
|
|
def read_header(f: io.BufferedReader):
|
|
header = json.loads(f.readline())
|
|
tensor_specs = [TensorSpec.from_dict(ts) for ts in header['features']]
|
|
score_spec = TensorSpec.from_dict(
|
|
header['score']) if 'score' in header else None
|
|
advice_spec = TensorSpec.from_dict(
|
|
header['advice']) if 'advice' in header else None
|
|
return tensor_specs, score_spec, advice_spec
|
|
|
|
|
|
def read_one_observation(context: Optional[str], event_str: str,
|
|
f: io.BufferedReader, tensor_specs: List[TensorSpec],
|
|
score_spec: Optional[TensorSpec]):
|
|
event = json.loads(event_str)
|
|
if 'context' in event:
|
|
context = event['context']
|
|
event = json.loads(f.readline())
|
|
observation_id = int(event['observation'])
|
|
features = []
|
|
for ts in tensor_specs:
|
|
features.append(read_tensor(f, ts))
|
|
f.readline()
|
|
score = None
|
|
if score_spec is not None:
|
|
score_header = json.loads(f.readline())
|
|
assert int(score_header['outcome']) == observation_id
|
|
score = read_tensor(f, score_spec)
|
|
f.readline()
|
|
return context, observation_id, features, score
|
|
|
|
|
|
def read_stream(fname: str):
|
|
with io.BufferedReader(io.FileIO(fname, 'rb')) as f:
|
|
tensor_specs, score_spec, _ = read_header(f)
|
|
context = None
|
|
while True:
|
|
event_str = f.readline()
|
|
if not event_str:
|
|
break
|
|
context, observation_id, features, score = read_one_observation(
|
|
context, event_str, f, tensor_specs, score_spec)
|
|
yield context, observation_id, features, score
|
|
|
|
|
|
def main(args):
|
|
last_context = None
|
|
for ctx, obs_id, features, score in read_stream(args[1]):
|
|
if last_context != ctx:
|
|
print(f'context: {ctx}')
|
|
last_context = ctx
|
|
print(f'observation: {obs_id}')
|
|
for fv in features:
|
|
pretty_print_tensor_value(fv)
|
|
if score:
|
|
pretty_print_tensor_value(score)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main(sys.argv)
|