Source code for crash.subsystem.printk.lockless_ringbuffer
# -*- coding: utf-8 -*-
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:
import argparse
import gdb
from crash.util.symbols import Types, Symvals
from crash.exceptions import DelayedAttributeError
from crash.subsystem.printk import LogTypeException
types = Types(['struct printk_info *',
'struct prb_desc *',
'struct prb_data_block *',
'unsigned long',
'char *'])
symvals = Symvals(['prb', 'clear_seq'])
# TODO: put to separate type
[docs]
def atomic_long_read(val: gdb.Value) -> int:
return int(val["counter"])
[docs]
def read_null_end_string(buf: gdb.Value) -> str:
''' Read null-terminated string from a given buffer. '''
text = buf.string(encoding='utf8', errors='replace')
return text.partition('\0')[0]
[docs]
class LogConsistencyException(Exception):
pass
[docs]
class DevPrintkInfo:
''' Kernel struct dev_printk_info '''
subsystem: str
device: str
def __init__(self, info: gdb.Value) -> None:
self.subsystem = read_null_end_string(info['subsystem'])
self.device = read_null_end_string(info['device'])
[docs]
class PrintkInfo:
''' Kernel struct printk_info '''
seq: int # sequence number
ts_nsec: int # timestamp in nanoseconds
text_len: int # length of text message
facility: int # syslog facility
flags: int # internal record flags
level: int # syslog level
caller_id: int # thread id or processor id
dev_info: DevPrintkInfo
def __init__(self, info: gdb.Value) -> None:
self.seq = int(info['seq'])
self.ts_nsec = int(info['ts_nsec'])
self.text_len = int(info['text_len'])
self.facility = int(info['facility'])
self.flags = int(info['flags'])
self.level = int(info['level'])
self.caller_id = int(info['caller_id'])
self.dev_info = DevPrintkInfo(info['dev_info'])
[docs]
class PrbDataBlkLPos:
''' Kernel struct prb_data_blk_pos '''
begin: int
next: int
def __init__(self, blk_lpos: gdb.Value) -> None:
self.begin = int(blk_lpos['begin'])
self.next = int(blk_lpos['next'])
[docs]
class PrbDesc:
''' Kernel struct prb_desc '''
state_var: int
text_blk_lpos: PrbDataBlkLPos
sv_shift: int
sv_mask: int
def __init__(self, desc: gdb.Value) -> None:
self.state_var = atomic_long_read(desc['state_var'])
self.text_blk_lpos = PrbDataBlkLPos(desc['text_blk_lpos'])
sv_bits = types.unsigned_long_type.sizeof * 8
self.sv_shift = sv_bits - 2
self.sv_mask = 0x3 << self.sv_shift
[docs]
def desc_state(self) -> int:
''' Return state of the descriptor '''
return (self.state_var & self.sv_mask) >> self.sv_shift
[docs]
def is_finalized(self) -> bool:
''' Finalized desriptor points to a valid (deta) message '''
return self.desc_state() == 0x2
[docs]
def is_reusable(self) -> bool:
'''
Reusable descriptor still has a valid sequence number
but the data are gone.
'''
return self.desc_state() == 0x3
[docs]
class PrbDataBlock:
''' Kernel struct prb_data_block '''
id: int
data: gdb.Value
def __init__(self, dr: gdb.Value) -> None:
self.id = int(dr['id'])
self.data = dr['data']
[docs]
class PrbDataRing:
''' Kernel struct prb_data_ring '''
size_bits: int
data: gdb.Value
lpos_mask: int
def __init__(self, dr: gdb.Value) -> None:
self.size_bits = int(dr['size_bits'])
self.data = dr['data']
self.lpos_mask = (1 << self.size_bits) - 1
[docs]
def get_data_block(self, blk_lpos: PrbDataBlkLPos) -> PrbDataBlock:
''' Return PrbDataBlock for the given blk_lpos '''
begin_idx = blk_lpos.begin & self.lpos_mask
blk_p = self.data.cast(types.char_p_type) + begin_idx
return PrbDataBlock(blk_p.cast(types.prb_data_block_p_type))
[docs]
def get_text(self, blk_lpos: PrbDataBlkLPos, _len: int) -> str:
''' return string stored at the given blk_lpos '''
data_block = self.get_data_block(blk_lpos)
return data_block.data.cast(types.char_p_type).string(length=_len)
[docs]
class PrbDescRing:
''' Kernel struct prb_desc_ring '''
count_bits: int
descs: gdb.Value
infos: gdb.Value
head_id: int
tail_id: int
mask_id: int
def __init__(self, dr: gdb.Value) -> None:
self.count_bits = int(dr['count_bits'])
self.descs = dr['descs']
self.infos = dr['infos']
self.head_id = atomic_long_read(dr['head_id'])
self.tail_id = atomic_long_read(dr['tail_id'])
self.mask_id = (1 << self.count_bits) - 1
[docs]
def get_idx(self, _id: int) -> int:
''' Return index to the desc ring for the given id '''
return _id & self.mask_id
[docs]
def get_desc(self, _id: int) -> PrbDesc:
''' Return prb_desc structure for the given id '''
idx = self.get_idx(_id)
desc_p = (self.descs.cast(types.char_p_type) +
types.prb_desc_p_type.target().sizeof * idx)
return PrbDesc(desc_p.cast(types.prb_desc_p_type))
[docs]
def get_info(self, _id: int) -> PrintkInfo:
''' return printk_info structure for the given id '''
idx = self.get_idx(_id)
info_p = (self.infos.cast(types.char_p_type) +
types.printk_info_p_type.target().sizeof * idx)
return PrintkInfo(info_p.cast(types.printk_info_p_type))
[docs]
class PrbRingBuffer:
''' Kernel struct prb_ring_buffer '''
desc_ring: PrbDescRing
data_ring: PrbDataRing
def __init__(self, prb: gdb.Value) -> None:
self.desc_ring = PrbDescRing(gdb.Value(prb['desc_ring']))
self.data_ring = PrbDataRing(gdb.Value(prb['text_data_ring']))
[docs]
def is_valid_desc(self, desc: PrbDesc, info: PrintkInfo, seq: int) -> bool:
''' Does the descritor constains consistent values? '''
if not (desc.is_finalized() or desc.is_reusable()):
return False
# Must match the expected seq number. Otherwise is being updated.
return info.seq == seq
[docs]
def first_seq(self) -> int:
'''
Get sequence number of the tail entry.
'''
# The lockless algorithm guarantees that the tail entry
# always points to a descriptor in finalized or reusable state.
# The only exception is when the tail is being moved
# to the next entry, see prb_first_seq() in printk_ringbuffer.c
#
# As a result, the valid sequence number should be either in tail_id
# or tail_id + 1 entry.
for i in range(0, 1):
_id = self.desc_ring.tail_id + i
desc = self.desc_ring.get_desc(_id)
if desc.is_finalized() or desc.is_reusable():
info = self.desc_ring.get_info(_id)
return info.seq
# Something went wrong. Do not continue with an invalid sequence number.
raise LogConsistencyException('Can not find valid info in the tail descriptor')
[docs]
def show_msg(self, desc: PrbDesc, info: PrintkInfo,
args: argparse.Namespace) -> None:
'''
Show the message for the gived descriptor, printk info.
The output is mofified by pylog parameters.
'''
timestamp = ''
if not args.t:
timestamp = ('[{:5d}.{:06d}] '
.format(info.ts_nsec // 1000000000,
(info.ts_nsec % 1000000000) // 1000))
level = ''
if args.m:
level = '<{:d}>'.format(info.level)
text = self.data_ring.get_text(desc.text_blk_lpos, info.text_len)
print('{}{}{}'.format(level, timestamp, text))
if args.d:
# Only two dev_info values are supported at the moment
if info.dev_info.subsystem:
print(' SUBSYSTEM={}'.format(info.dev_info.subsystem))
if info.dev_info.device:
print(' DEVICE={}'.format(info.dev_info.device))
[docs]
def show_log(self, args: argparse.Namespace) -> None:
""" Show the entire log """
seq = self.first_seq()
# Iterate over all entries with valid sequence number
while True:
desc = self.desc_ring.get_desc(seq)
info = self.desc_ring.get_info(seq)
if not self.is_valid_desc(desc, info, seq):
break
seq += 1
# Sequence numbers are stored in separate ring buffer.
# The descriptor ring might include valid sequence numbers
# but the data might already be replaced.
if desc.is_reusable():
continue
self.show_msg(desc, info, args)
return
[docs]
def lockless_rb_show(args: argparse.Namespace) -> None:
"""
Try to show printk log stored in the lockless ringbuffer
This type of ringbuffer has replaced the structured ring buffer
in kernel-5.10.
Raises:
LogTypeException: The log is not in the lockless ringbuffer.
"""
try:
prb = PrbRingBuffer(symvals.prb)
except DelayedAttributeError:
raise LogTypeException('not lockless log') from None
prb.show_log(args)