Source code for crash.subsystem.printk.structured_ringbuffer
# -*- coding: utf-8 -*-
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:
from typing import Dict, Iterable, Any
import argparse
import gdb
from crash.util.symbols import Types, Symvals
from crash.exceptions import DelayedAttributeError
from crash.subsystem.printk import LogTypeException
types = Types(['struct printk_log *', 'char *'])
symvals = Symvals(['log_buf', 'log_buf_len', 'log_first_idx', 'log_next_idx',
'clear_seq', 'log_first_seq', 'log_next_seq'])
[docs]
def log_from_idx(logbuf: gdb.Value, idx: int) -> Dict:
msg = (logbuf + idx).cast(types.printk_log_p_type)
try:
textval = (msg.cast(types.char_p_type) +
types.printk_log_p_type.target().sizeof)
text = textval.string(length=int(msg['text_len']))
except UnicodeDecodeError as e:
print(e)
textlen = int(msg['text_len'])
dictlen = int(msg['dict_len'])
dictval = (msg.cast(types.char_p_type) +
types.printk_log_p_type.target().sizeof + textlen)
msgdict = dictval.string(length=dictlen)
msglen = int(msg['len'])
# A zero-length message means we wrap back to the beginning
if msglen == 0:
nextidx = 0
else:
nextidx = idx + msglen
return {
'text' : text[0:textlen],
'timestamp' : int(msg['ts_nsec']),
'level' : int(msg['level']),
'next' : nextidx,
'dict' : msgdict[0:dictlen],
}
[docs]
def get_log_msgs() -> Iterable[Dict[str, Any]]:
try:
idx = symvals.log_first_idx
except DelayedAttributeError:
raise LogTypeException('not structured log') from None
if symvals.clear_seq < symvals.log_first_seq:
# mypy seems to think the preceding clear_seq is fine but this
# one isn't. Derp.
symvals.clear_seq = symvals.log_first_seq # type: ignore
seq = symvals.clear_seq
idx = symvals.log_first_idx
while seq < symvals.log_next_seq:
msg = log_from_idx(symvals.log_buf, idx)
seq += 1
idx = msg['next']
yield msg
[docs]
def structured_rb_show(args: argparse.Namespace) -> None:
for msg in get_log_msgs():
timestamp = ''
if not args.t:
usecs = int(msg['timestamp'])
timestamp = ('[{:5d}.{:06d}] '
.format(usecs // 1000000000,
(usecs % 1000000000) // 1000))
level = ''
if args.m:
level = '<{:d}>'.format(msg['level'])
for line in msg['text'].split('\n'):
print('{}{}{}'.format(level, timestamp, line))
if (args.d and msg['dict']):
for entry in msg['dict'].split('\0'):
print(' {}'.format(entry))