Source code for crash.target

# -*- coding: utf-8 -*-
# vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79:

from typing import Any, Iterator, List, Optional, Tuple, Type

import abc
import sys

import gdb

from crash.exceptions import MissingSymbolError
import crash.infra.callback

from crash.types.percpu import get_percpu_vars
from crash.util.symbols import Symbols, Symvals
from crash.util import get_typed_pointer

symbols = Symbols(['runqueues'])
symvals = Symvals(['crashing_cpu'])

[docs] class IncorrectTargetError(ValueError): """Incorrect target implementation for this kernel""" pass
PTID = Tuple[int, int, int] # This keeps stack traces from continuing into userspace and causing problems.
[docs] class KernelFrameFilter: def __init__(self, address: int) -> None: self.name = "KernelFrameFilter" self.priority = 100 self.enabled = True self.address = address gdb.frame_filters[self.name] = self
[docs] def filter(self, frame_iter: Iterator[Any]) -> Any: return KernelAddressIterator(frame_iter, self.address)
[docs] class KernelAddressIterator: def __init__(self, ii: Iterator, address: int) -> None: self.input_iterator = ii self.address = address def __iter__(self) -> Any: return self def __next__(self) -> Any: frame = next(self.input_iterator) if frame.inferior_frame().pc() < self.address: raise StopIteration return frame
# A working target will be a mixin composed of a class derived from # TargetBase and TargetFetchRegistersBase
[docs] class TargetBase(gdb.LinuxKernelTarget, metaclass=abc.ABCMeta): def __init__(self, debug: int = 0) -> None: super().__init__() self.debug = debug self.shortname = "Crash-Python Linux Target" self.longname = "Use a Core file as a Linux Kernel Target" self.ready = False self.crashing_thread: Optional[gdb.InferiorThread] = None
[docs] def open(self, name: str, from_tty: bool) -> None: if not self.fetch_registers_usable(): raise IncorrectTargetError("Not usable") if not gdb.objfiles()[0].has_symbols(): raise ValueError("Cannot debug kernel without symbol table") super().open(name, from_tty) crash.infra.callback.target_ready() self.setup_tasks()
[docs] def setup_tasks(self) -> None: # pylint complains about this. It's ugly but putting the import within # setup_tasks breaks the cycle. # pylint: disable=cyclic-import from crash.types.task import LinuxTask, types as task_types import crash.cache.tasks # pylint: disable=redefined-outer-name print("Loading tasks...", end="") sys.stdout.flush() rqs = get_percpu_vars(symbols.runqueues) rqscurrs = {int(x["curr"]) : k for (k, x) in rqs.items()} task_count = 0 try: crashing_cpu = symvals.crashing_cpu except MissingSymbolError: crashing_cpu = -1 task_struct_p_type = task_types.task_struct_type.pointer() for thread in gdb.selected_inferior().threads(): task_address = thread.ptid[2] task = get_typed_pointer(task_address, task_struct_p_type) ltask = LinuxTask(task.dereference()) active = task_address in rqscurrs if active: cpu = rqscurrs[task_address] regs = self.kdumpfile.attr.cpu[cpu].reg ltask.set_active(cpu, regs) thread.info = ltask if active and cpu == crashing_cpu: self.crashing_thread = thread self.arch_setup_thread(thread) ltask.attach_thread(thread) crash.cache.tasks.cache_task(ltask) task_count += 1 if task_count % 100 == 0: print(".", end='') sys.stdout.flush() print(" done. ({} tasks total)".format(task_count))
[docs] def close(self) -> None: pass
# pylint: disable=unused-argument
[docs] def thread_alive(self, ptid: PTID) -> bool: return True
# pylint: disable=unused-argument
[docs] def prepare_to_store(self, thread: gdb.InferiorThread) -> None: pass
[docs] @abc.abstractmethod def fetch_registers_usable(self) -> bool: pass
[docs] @abc.abstractmethod def fetch_registers(self, thread: gdb.InferiorThread, register: Optional[gdb.RegisterDescriptor]) -> Optional[gdb.RegisterCollectionType]: pass
# pylint: disable=unused-argument
[docs] def store_registers(self, thread: gdb.InferiorThread, registers: gdb.RegisterCollectionType) -> None: raise TypeError("This target is read-only.")
# pylint: disable=unused-argument
[docs] def has_execution(self, ptid: PTID) -> bool: return False
[docs] @abc.abstractmethod def arch_setup_thread(self, thread: gdb.InferiorThread) -> None: pass
[docs] @abc.abstractmethod def get_stack_pointer(self, thread: gdb.InferiorThread) -> int: pass
[docs] class TargetFetchRegistersBase(metaclass=abc.ABCMeta): """ The base class from which to implement the fetch_registers callback. The architecture code must implement the :meth:`fetch_active` and :meth:`fetch_scheduled` methods. """ _enabled: bool = False def __init__(self) -> None: super().__init__() self.fetching: bool = False # pylint: disable=unused-argument
[docs] @classmethod def enable(cls, unused: Optional[gdb.Type] = None) -> None: cls._enabled = True
[docs] @classmethod def fetch_registers_usable(cls) -> bool: return cls._enabled
[docs] @abc.abstractmethod def fetch_active(self, thread: gdb.InferiorThread, register: Optional[gdb.RegisterDescriptor]) -> gdb.RegisterCollectionType: pass
[docs] @abc.abstractmethod def fetch_scheduled(self, thread: gdb.InferiorThread, register: Optional[gdb.RegisterDescriptor]) -> gdb.RegisterCollectionType: pass
[docs] def fetch_registers(self, thread: gdb.InferiorThread, register: Optional[gdb.RegisterDescriptor]) -> Optional[gdb.RegisterCollectionType]: ret: Optional[gdb.RegisterCollectionType] = None # Don't recurse, but don't fail either if self.fetching: return None self.fetching = True try: if thread.info.active: ret = self.fetch_active(thread, register) else: ret = self.fetch_scheduled(thread, register) except AttributeError: # We still want to be able to list the threads even if we haven't # setup tasks. ret = None self.fetching = False return ret
_targets: List[Type[TargetBase]] = []
[docs] def register_target(new_target: Type[TargetBase]) -> None: _targets.append(new_target)
[docs] def setup_target() -> TargetBase: for target in _targets: t = None try: t = target() t.open("", False) return t except IncorrectTargetError: del t raise IncorrectTargetError("Could not identify target implementation for this kernel")
[docs] def check_target() -> TargetBase: target = gdb.current_target() if target is None: raise ValueError("No current target") if not isinstance(target, TargetBase): raise ValueError(f"Current target {type(target)} is not supported") return target