From cd0366849014d10a8925fdfe4054751ab9a4c3b8 Mon Sep 17 00:00:00 2001 From: Niklas Bittner Date: Sun, 12 Jan 2025 04:13:04 +0100 Subject: [PATCH] feat: basic packet capturing --- README.md | 10 ++ .../commands/__init__.py | 0 resimulate/commands/record.py | 115 ++++++++++++++++++ resimulate/commands/replay.py | 1 + {src => resimulate}/resimulate.py | 49 +++++--- .../replay.py => resimulate/util/__init__.py | 0 resimulate/util/dummy_sim_link.py | 41 +++++++ resimulate/util/logger.py | 30 +++++ resimulate/util/tracer.py | 88 ++++++++++++++ 9 files changed, 320 insertions(+), 14 deletions(-) rename src/commands/record.py => resimulate/commands/__init__.py (100%) create mode 100644 resimulate/commands/record.py create mode 100644 resimulate/commands/replay.py rename {src => resimulate}/resimulate.py (57%) rename src/commands/replay.py => resimulate/util/__init__.py (100%) create mode 100644 resimulate/util/dummy_sim_link.py create mode 100644 resimulate/util/logger.py create mode 100644 resimulate/util/tracer.py diff --git a/README.md b/README.md index e69de29..7243483 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,10 @@ +# ReSIMulate + +**ReSIMulate** is a terminal application for analyzing eSIM and SIM card interactions. It allows you to record APDU commands, save them, and replay them for differential testing and debugging. + +## Features + +- Record APDU commands from a device or interface. +- Replay saved APDU commands to a target device. +- Supports configurable timeouts, devices, and verbose output. +- Enables differential testing for SIM interactions. diff --git a/src/commands/record.py b/resimulate/commands/__init__.py similarity index 100% rename from src/commands/record.py rename to resimulate/commands/__init__.py diff --git a/resimulate/commands/record.py b/resimulate/commands/record.py new file mode 100644 index 0000000..7a6e1c4 --- /dev/null +++ b/resimulate/commands/record.py @@ -0,0 +1,115 @@ +import pickle +from queue import Empty, Queue +from threading import Thread +import time + +from pySim.apdu import Apdu, ApduCommand +from pySim.apdu_source.gsmtap import ApduSource +from rich.live import Live +from util.logger import log +from util.tracer import Tracer + +from rich.console import Group +from rich.panel import Panel +from rich.progress import ( + BarColumn, + Progress, + SpinnerColumn, + TextColumn, + TimeElapsedColumn, +) + + +class Recorder: + def __init__(self, source: ApduSource): + self.tracer = Tracer(source) + + self.captured_apdus = [] + + self.package_queue: Queue[tuple[Apdu, ApduCommand]] = Queue() + self.tracer_thread = Thread( + target=self.tracer.main, args=(self.package_queue,), daemon=True + ) + + def record(self, output_path: str, timeout: int): + capture_progress = Progress( + TimeElapsedColumn(), + TextColumn("{task.completed}"), + TextColumn("[bold blue]{task.fields[packet_type]}"), + TextColumn("[bold green]{task.fields[packet_description]}"), + TextColumn("{task.fields[packet_code]}"), + ) + + overall_progress = Progress( + TimeElapsedColumn(), + BarColumn(), + TextColumn("{task.description}"), + ) + + main_group = Group( + Panel(capture_progress), + overall_progress, + ) + + overall_task_id = overall_progress.add_task( + f"[bold red]{len(self.captured_apdus)} packets captured!", + start=True, + total=None, + ) + + with Live(main_group): + self.tracer_thread.start() + + while self.tracer_thread.is_alive(): + + try: + apdu, apdu_command = self.package_queue.get(timeout=timeout) + + if apdu is None: + log.debug("No more APDU packets to capture.") + break + + capture_task_id = capture_progress.add_task( + "", + completed=len(self.captured_apdus), + packet_type=str(apdu_command._name) or "", + packet_description=str(apdu_command.path_str), + packet_code=str(apdu_command.col_sw), + ) + + self.captured_apdus.append(apdu) + + capture_progress.stop_task(capture_task_id) + except TimeoutError: + log.debug("Timeout reached, stopping capture.") + break + except Empty: + log.debug("No more APDU packets to capture.") + break + except UnboundLocalError as e: + log.debug("Error capturing APDU packets: %s", e) + break + + overall_progress.update( + overall_task_id, + description=f"[bold green]{len(self.captured_apdus)} packet(s) captured!", + ) + + capture_progress.stop_task(capture_task_id) + + overall_progress.update( + overall_task_id, + description="[bold yellow]Saving captured APDU commands...", + ) + + log.debug("Saving captured APDU commands to %s", output_path) + + with open(output_path, "wb") as f: + pickle.dump(self.captured_apdus, f) + time.sleep(3) + + overall_progress.update( + overall_task_id, + description="[bold green]Captured %s APDU packets!" + % len(self.captured_apdus), + ) diff --git a/resimulate/commands/replay.py b/resimulate/commands/replay.py new file mode 100644 index 0000000..09d0fca --- /dev/null +++ b/resimulate/commands/replay.py @@ -0,0 +1 @@ +# https://github.com/Textualize/rich/blob/master/examples/dynamic_progress.py diff --git a/src/resimulate.py b/resimulate/resimulate.py similarity index 57% rename from src/resimulate.py rename to resimulate/resimulate.py index 17a9a94..a1956c3 100755 --- a/src/resimulate.py +++ b/resimulate/resimulate.py @@ -1,7 +1,12 @@ #!/usr/bin/env python # PYTHON_ARGCOMPLETE_OK -import argparse, argcomplete +import argparse + +import argcomplete +from pySim.apdu_source.gsmtap import GsmtapApduSource + +from commands.record import Recorder parser = argparse.ArgumentParser( description="ReSIMulate is a terminal application built for eSIM and SIM-specific APDU analysis. It captures APDU commands, saves them, and replays them to facilitate differential testing, ensuring accurate validation and debugging of SIM interactions.", @@ -14,10 +19,15 @@ subparsers = parser.add_subparsers( required=True, help="Available commands: record, replay", ) +parser.add_argument( + "-v", "--verbose", action="store_true", help="Enable verbose output during replay." +) +parser.add_argument("--version", action="version", version="%(prog)s 0.1") # Record command record_parser = subparsers.add_parser( - "record", help="Record APDU commands from a specified source." + "record", + help="Record APDU commands from a specified source. Uses the SIMtrace2 GSMTAP to capture APDUs via UDP.", ) record_parser.add_argument( "-o", @@ -27,18 +37,20 @@ record_parser.add_argument( help="File to save recorded APDU commands (e.g., 'commands.apdu').", ) record_parser.add_argument( - "-d", - "--device", - type=str, - default="default_device", - help="Device or interface to listen for APDU commands (default: 'default_device').", + "-i", + "--bind-ip", + default="127.0.0.1", + help="Local IP address to which to bind the UDP port. (default: '127.0.0.1')", +) +record_parser.add_argument( + "-p", "--bind-port", default=4729, help="Local UDP port. (default: '4729')" ) record_parser.add_argument( "-t", "--timeout", type=int, - default=30, - help="Timeout in seconds for recording (default: 30).", + default=10, + help="Timeout in seconds for recording (default: 10).", ) # Replay command @@ -49,7 +61,7 @@ replay_parser.add_argument( "-i", "--input", required=True, - type=str, + type=argparse.FileType("r"), help="File containing APDU commands to replay (e.g., 'commands.apdu').", ) replay_parser.add_argument( @@ -59,10 +71,19 @@ replay_parser.add_argument( default="default_device", help="Target simtrace device to send APDU commands (default: 'default_device').", ) -replay_parser.add_argument( - "-v", "--verbose", action="store_true", help="Enable verbose output during replay." -) if __name__ == "__main__": - argcomplete.autocomplete(parser) + # TODO: Configure argcomplete for shell tab completion + # argcomplete.autocomplete(parser) args = parser.parse_args() + + if args.command == "record": + source = GsmtapApduSource(args.bind_ip, args.bind_port) + recorder = Recorder(source) + recorder.record(args.output, args.timeout) + + elif args.command == "replay": + pass + + else: + raise ValueError(f"Unsupported command: {args.command}") diff --git a/src/commands/replay.py b/resimulate/util/__init__.py similarity index 100% rename from src/commands/replay.py rename to resimulate/util/__init__.py diff --git a/resimulate/util/dummy_sim_link.py b/resimulate/util/dummy_sim_link.py new file mode 100644 index 0000000..f9e16a9 --- /dev/null +++ b/resimulate/util/dummy_sim_link.py @@ -0,0 +1,41 @@ +from osmocom.utils import h2i +from pySim.transport import LinkBase + + +# Taken from the pySim project and modified for the ReSIMulate project +class DummySimLink(LinkBase): + """A dummy implementation of the LinkBase abstract base class. Currently required + as the UiccCardBase doesn't work without SimCardCommands, which in turn require + a LinkBase implementation talking to a card. + + In the tracer, we don't actually talk to any card, so we simply drop everything + and claim it is successful. + + The UiccCardBase / SimCardCommands should be refactored to make this obsolete later. + """ + + def __init__(self, debug: bool = False, **kwargs): + super().__init__(**kwargs) + self._debug = debug + self._atr = h2i("3B9F96801F878031E073FE211B674A4C753034054BA9") + + def __str__(self) -> str: + return "dummy" + + def _send_apdu(self, pdu) -> tuple[list, str]: + return [], "9000" + + def connect(self): + pass + + def disconnect(self): + pass + + def _reset_card(self): + return 1 + + def get_atr(self) -> list[int]: + return self._atr + + def wait_for_card(self): + pass diff --git a/resimulate/util/logger.py b/resimulate/util/logger.py new file mode 100644 index 0000000..bb28f25 --- /dev/null +++ b/resimulate/util/logger.py @@ -0,0 +1,30 @@ +import logging +from rich.logging import RichHandler +from rich.console import Console + + +class RichLogger: + _instance = None + + def __new__(cls, console: Console = None): + if cls._instance is None: + cls._instance = super(RichLogger, cls).__new__(cls) + cls._instance._initialize(console) + return cls._instance + + def _initialize(self, console: Console = None): + logging.basicConfig( + level="NOTSET", + format="%(message)s", + datefmt="[%X]", + handlers=[RichHandler(rich_tracebacks=True, console=console)], + ) + self.logger = logging.getLogger("rich") + + def get_logger(self, console: Console = None) -> logging.Logger: + if console: + self._initialize(console) + return self.logger + + +log = RichLogger().get_logger() diff --git a/resimulate/util/tracer.py b/resimulate/util/tracer.py new file mode 100644 index 0000000..937467e --- /dev/null +++ b/resimulate/util/tracer.py @@ -0,0 +1,88 @@ +from queue import Queue + +from pySim.apdu import ApduDecoder, CardReset +from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands +from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands +from pySim.apdu.ts_102_221 import UiccSelect, UiccStatus +from pySim.apdu_source import ApduSource +from pySim.cards import UiccCardBase +from pySim.commands import SimCardCommands +from pySim.euicc import CardApplicationECASD, CardApplicationISDR +from pySim.runtime import RuntimeState +from pySim.ts_31_102 import CardApplicationUSIM +from pySim.ts_31_103 import CardApplicationISIM +from pySim.ts_102_221 import CardProfileUICC + +from util.dummy_sim_link import DummySimLink +from util.logger import log + +APDU_COMMANDS = UiccApduCommands + UsimApduCommands + + +# Taken from the pySim project and modified for the ReSIMulate project +class Tracer: + def __init__(self, source: ApduSource): + # we assume a generic UICC profile; as all APDUs return 9000 in DummySimLink above, + # all CardProfileAddon (including SIM) will probe successful. + profile = CardProfileUICC() + profile.add_application(CardApplicationUSIM()) + profile.add_application(CardApplicationISIM()) + profile.add_application(CardApplicationISDR()) + profile.add_application(CardApplicationECASD()) + + scc = SimCardCommands(transport=DummySimLink()) + card = UiccCardBase(scc) + self.runtime_state = RuntimeState(card, profile) + self.apdu_decoder = ApduDecoder(APDU_COMMANDS) + + self.suppress_status = True + self.suppress_select = True + self.show_raw_apdu = False + self.source = source + + def main(self, package_queue: Queue): + """Main loop of tracer: Iterates over all Apdu received from source.""" + apdu_counter = 0 + while True: + # obtain the next APDU from the source (blocking read) + try: + apdu = self.source.read() + apdu_counter = apdu_counter + 1 + except StopIteration: + log.debug("%i APDUs parsed, stop iteration." % apdu_counter) + package_queue.task_done() + return + except Exception as e: + log.error("Error reading APDU: %s", e) + continue + + if apdu is None: + log.debug("Received None APDU") + continue + + if isinstance(apdu, CardReset): + log.debug("Resetting runtime state") + self.runtime_state.reset() + continue + + # ask ApduDecoder to look-up (INS,CLA) + instantiate an ApduCommand derived + apdu_command = self.apdu_decoder.input(apdu) + # process the APDU (may modify the RuntimeState) + try: + apdu_command.process(self.runtime_state) + except ValueError as e: + log.error("Error processing APDU: %s", e) + continue + except AttributeError as e: + log.error("Error processing APDU: %s", e) + return + + # Avoid cluttering the log with too much verbosity + if self.suppress_select and isinstance(apdu_command, UiccSelect): + log.debug("Suppressing UiccSelect") + continue + if self.suppress_status and isinstance(apdu_command, UiccStatus): + log.debug("Suppressing UiccStatus") + continue + + package_queue.put((apdu, apdu_command))