feat: basic packet capturing

This commit is contained in:
2025-01-12 04:13:04 +01:00
parent c984146374
commit cd03668490
9 changed files with 320 additions and 14 deletions

View File

@@ -0,0 +1,10 @@
# ReSIMulate
**ReSIMulate** is a terminal application for analyzing eSIM and SIM card interactions. It allows you to record APDU commands, save them, and replay them for differential testing and debugging.
## Features
- Record APDU commands from a device or interface.
- Replay saved APDU commands to a target device.
- Supports configurable timeouts, devices, and verbose output.
- Enables differential testing for SIM interactions.

View File

@@ -0,0 +1,115 @@
import pickle
from queue import Empty, Queue
from threading import Thread
import time
from pySim.apdu import Apdu, ApduCommand
from pySim.apdu_source.gsmtap import ApduSource
from rich.live import Live
from util.logger import log
from util.tracer import Tracer
from rich.console import Group
from rich.panel import Panel
from rich.progress import (
BarColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
)
class Recorder:
def __init__(self, source: ApduSource):
self.tracer = Tracer(source)
self.captured_apdus = []
self.package_queue: Queue[tuple[Apdu, ApduCommand]] = Queue()
self.tracer_thread = Thread(
target=self.tracer.main, args=(self.package_queue,), daemon=True
)
def record(self, output_path: str, timeout: int):
capture_progress = Progress(
TimeElapsedColumn(),
TextColumn("{task.completed}"),
TextColumn("[bold blue]{task.fields[packet_type]}"),
TextColumn("[bold green]{task.fields[packet_description]}"),
TextColumn("{task.fields[packet_code]}"),
)
overall_progress = Progress(
TimeElapsedColumn(),
BarColumn(),
TextColumn("{task.description}"),
)
main_group = Group(
Panel(capture_progress),
overall_progress,
)
overall_task_id = overall_progress.add_task(
f"[bold red]{len(self.captured_apdus)} packets captured!",
start=True,
total=None,
)
with Live(main_group):
self.tracer_thread.start()
while self.tracer_thread.is_alive():
try:
apdu, apdu_command = self.package_queue.get(timeout=timeout)
if apdu is None:
log.debug("No more APDU packets to capture.")
break
capture_task_id = capture_progress.add_task(
"",
completed=len(self.captured_apdus),
packet_type=str(apdu_command._name) or "",
packet_description=str(apdu_command.path_str),
packet_code=str(apdu_command.col_sw),
)
self.captured_apdus.append(apdu)
capture_progress.stop_task(capture_task_id)
except TimeoutError:
log.debug("Timeout reached, stopping capture.")
break
except Empty:
log.debug("No more APDU packets to capture.")
break
except UnboundLocalError as e:
log.debug("Error capturing APDU packets: %s", e)
break
overall_progress.update(
overall_task_id,
description=f"[bold green]{len(self.captured_apdus)} packet(s) captured!",
)
capture_progress.stop_task(capture_task_id)
overall_progress.update(
overall_task_id,
description="[bold yellow]Saving captured APDU commands...",
)
log.debug("Saving captured APDU commands to %s", output_path)
with open(output_path, "wb") as f:
pickle.dump(self.captured_apdus, f)
time.sleep(3)
overall_progress.update(
overall_task_id,
description="[bold green]Captured %s APDU packets!"
% len(self.captured_apdus),
)

View File

@@ -0,0 +1 @@
# https://github.com/Textualize/rich/blob/master/examples/dynamic_progress.py

View File

@@ -1,7 +1,12 @@
#!/usr/bin/env python #!/usr/bin/env python
# PYTHON_ARGCOMPLETE_OK # PYTHON_ARGCOMPLETE_OK
import argparse, argcomplete import argparse
import argcomplete
from pySim.apdu_source.gsmtap import GsmtapApduSource
from commands.record import Recorder
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="ReSIMulate is a terminal application built for eSIM and SIM-specific APDU analysis. It captures APDU commands, saves them, and replays them to facilitate differential testing, ensuring accurate validation and debugging of SIM interactions.", description="ReSIMulate is a terminal application built for eSIM and SIM-specific APDU analysis. It captures APDU commands, saves them, and replays them to facilitate differential testing, ensuring accurate validation and debugging of SIM interactions.",
@@ -14,10 +19,15 @@ subparsers = parser.add_subparsers(
required=True, required=True,
help="Available commands: record, replay", help="Available commands: record, replay",
) )
parser.add_argument(
"-v", "--verbose", action="store_true", help="Enable verbose output during replay."
)
parser.add_argument("--version", action="version", version="%(prog)s 0.1")
# Record command # Record command
record_parser = subparsers.add_parser( record_parser = subparsers.add_parser(
"record", help="Record APDU commands from a specified source." "record",
help="Record APDU commands from a specified source. Uses the SIMtrace2 GSMTAP to capture APDUs via UDP.",
) )
record_parser.add_argument( record_parser.add_argument(
"-o", "-o",
@@ -27,18 +37,20 @@ record_parser.add_argument(
help="File to save recorded APDU commands (e.g., 'commands.apdu').", help="File to save recorded APDU commands (e.g., 'commands.apdu').",
) )
record_parser.add_argument( record_parser.add_argument(
"-d", "-i",
"--device", "--bind-ip",
type=str, default="127.0.0.1",
default="default_device", help="Local IP address to which to bind the UDP port. (default: '127.0.0.1')",
help="Device or interface to listen for APDU commands (default: 'default_device').", )
record_parser.add_argument(
"-p", "--bind-port", default=4729, help="Local UDP port. (default: '4729')"
) )
record_parser.add_argument( record_parser.add_argument(
"-t", "-t",
"--timeout", "--timeout",
type=int, type=int,
default=30, default=10,
help="Timeout in seconds for recording (default: 30).", help="Timeout in seconds for recording (default: 10).",
) )
# Replay command # Replay command
@@ -49,7 +61,7 @@ replay_parser.add_argument(
"-i", "-i",
"--input", "--input",
required=True, required=True,
type=str, type=argparse.FileType("r"),
help="File containing APDU commands to replay (e.g., 'commands.apdu').", help="File containing APDU commands to replay (e.g., 'commands.apdu').",
) )
replay_parser.add_argument( replay_parser.add_argument(
@@ -59,10 +71,19 @@ replay_parser.add_argument(
default="default_device", default="default_device",
help="Target simtrace device to send APDU commands (default: 'default_device').", help="Target simtrace device to send APDU commands (default: 'default_device').",
) )
replay_parser.add_argument(
"-v", "--verbose", action="store_true", help="Enable verbose output during replay."
)
if __name__ == "__main__": if __name__ == "__main__":
argcomplete.autocomplete(parser) # TODO: Configure argcomplete for shell tab completion
# argcomplete.autocomplete(parser)
args = parser.parse_args() args = parser.parse_args()
if args.command == "record":
source = GsmtapApduSource(args.bind_ip, args.bind_port)
recorder = Recorder(source)
recorder.record(args.output, args.timeout)
elif args.command == "replay":
pass
else:
raise ValueError(f"Unsupported command: {args.command}")

View File

@@ -0,0 +1,41 @@
from osmocom.utils import h2i
from pySim.transport import LinkBase
# Taken from the pySim project and modified for the ReSIMulate project
class DummySimLink(LinkBase):
"""A dummy implementation of the LinkBase abstract base class. Currently required
as the UiccCardBase doesn't work without SimCardCommands, which in turn require
a LinkBase implementation talking to a card.
In the tracer, we don't actually talk to any card, so we simply drop everything
and claim it is successful.
The UiccCardBase / SimCardCommands should be refactored to make this obsolete later.
"""
def __init__(self, debug: bool = False, **kwargs):
super().__init__(**kwargs)
self._debug = debug
self._atr = h2i("3B9F96801F878031E073FE211B674A4C753034054BA9")
def __str__(self) -> str:
return "dummy"
def _send_apdu(self, pdu) -> tuple[list, str]:
return [], "9000"
def connect(self):
pass
def disconnect(self):
pass
def _reset_card(self):
return 1
def get_atr(self) -> list[int]:
return self._atr
def wait_for_card(self):
pass

30
resimulate/util/logger.py Normal file
View File

@@ -0,0 +1,30 @@
import logging
from rich.logging import RichHandler
from rich.console import Console
class RichLogger:
_instance = None
def __new__(cls, console: Console = None):
if cls._instance is None:
cls._instance = super(RichLogger, cls).__new__(cls)
cls._instance._initialize(console)
return cls._instance
def _initialize(self, console: Console = None):
logging.basicConfig(
level="NOTSET",
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(rich_tracebacks=True, console=console)],
)
self.logger = logging.getLogger("rich")
def get_logger(self, console: Console = None) -> logging.Logger:
if console:
self._initialize(console)
return self.logger
log = RichLogger().get_logger()

88
resimulate/util/tracer.py Normal file
View File

@@ -0,0 +1,88 @@
from queue import Queue
from pySim.apdu import ApduDecoder, CardReset
from pySim.apdu.ts_31_102 import ApduCommands as UsimApduCommands
from pySim.apdu.ts_102_221 import ApduCommands as UiccApduCommands
from pySim.apdu.ts_102_221 import UiccSelect, UiccStatus
from pySim.apdu_source import ApduSource
from pySim.cards import UiccCardBase
from pySim.commands import SimCardCommands
from pySim.euicc import CardApplicationECASD, CardApplicationISDR
from pySim.runtime import RuntimeState
from pySim.ts_31_102 import CardApplicationUSIM
from pySim.ts_31_103 import CardApplicationISIM
from pySim.ts_102_221 import CardProfileUICC
from util.dummy_sim_link import DummySimLink
from util.logger import log
APDU_COMMANDS = UiccApduCommands + UsimApduCommands
# Taken from the pySim project and modified for the ReSIMulate project
class Tracer:
def __init__(self, source: ApduSource):
# we assume a generic UICC profile; as all APDUs return 9000 in DummySimLink above,
# all CardProfileAddon (including SIM) will probe successful.
profile = CardProfileUICC()
profile.add_application(CardApplicationUSIM())
profile.add_application(CardApplicationISIM())
profile.add_application(CardApplicationISDR())
profile.add_application(CardApplicationECASD())
scc = SimCardCommands(transport=DummySimLink())
card = UiccCardBase(scc)
self.runtime_state = RuntimeState(card, profile)
self.apdu_decoder = ApduDecoder(APDU_COMMANDS)
self.suppress_status = True
self.suppress_select = True
self.show_raw_apdu = False
self.source = source
def main(self, package_queue: Queue):
"""Main loop of tracer: Iterates over all Apdu received from source."""
apdu_counter = 0
while True:
# obtain the next APDU from the source (blocking read)
try:
apdu = self.source.read()
apdu_counter = apdu_counter + 1
except StopIteration:
log.debug("%i APDUs parsed, stop iteration." % apdu_counter)
package_queue.task_done()
return
except Exception as e:
log.error("Error reading APDU: %s", e)
continue
if apdu is None:
log.debug("Received None APDU")
continue
if isinstance(apdu, CardReset):
log.debug("Resetting runtime state")
self.runtime_state.reset()
continue
# ask ApduDecoder to look-up (INS,CLA) + instantiate an ApduCommand derived
apdu_command = self.apdu_decoder.input(apdu)
# process the APDU (may modify the RuntimeState)
try:
apdu_command.process(self.runtime_state)
except ValueError as e:
log.error("Error processing APDU: %s", e)
continue
except AttributeError as e:
log.error("Error processing APDU: %s", e)
return
# Avoid cluttering the log with too much verbosity
if self.suppress_select and isinstance(apdu_command, UiccSelect):
log.debug("Suppressing UiccSelect")
continue
if self.suppress_status and isinstance(apdu_command, UiccStatus):
log.debug("Suppressing UiccStatus")
continue
package_queue.put((apdu, apdu_command))