fix: error handling during replay

This commit is contained in:
2025-02-03 21:14:45 +01:00
parent 20bcf565f4
commit 84070bb84a
11 changed files with 309 additions and 84 deletions

42
.vscode/launch.json vendored
View File

@@ -10,7 +10,13 @@
"request": "launch",
"program": "${workspaceFolder}/resimulate.py",
"console": "integratedTerminal",
"args": ["record", "-o", "test.apdu"],
"args": [
"record",
"-o",
"${input:promptFilename}",
"--isd-r",
"${input:pickISDR}"
],
"justMyCode": false
},
{
@@ -19,8 +25,40 @@
"request": "launch",
"program": "${workspaceFolder}/resimulate.py",
"console": "integratedTerminal",
"args": ["replay", "-i", "test.pkl"],
"args": [
"replay",
"-i",
"${input:pickFile}",
"--src-isd-r",
"${input:pickISDR}",
"--target-isd-r",
"${input:pickISDR}"
],
"justMyCode": false
}
],
"inputs": [
{
"id": "pickFile",
"type": "command",
"command": "extension.commandvariable.file.pickFile",
"args": {
"description": "Path to recording file.",
"fromFolder": { "fixed": "${workspaceFolder}" }
}
},
{
"type": "promptString",
"id": "promptFilename",
"description": "Name of the recording file."
},
{
"type": "pickString",
"id": "pickISDR",
"description": "Pick the ISD-R",
"options": ["default", "5ber"],
"default": "default"
}
]
}

View File

@@ -2,6 +2,7 @@
# PYTHON_ARGCOMPLETE_OK
import argparse
import logging
import argcomplete
from pySim.apdu_source.gsmtap import GsmtapApduSource
@@ -9,6 +10,7 @@ from rich_argparse import RichHelpFormatter
from resimulate.commands.record import Recorder
from resimulate.commands.replay import Replayer
from resimulate.util.logger import log
parser = argparse.ArgumentParser(
description="ReSIMulate is a terminal application built for eSIM and SIM-specific APDU analysis. It captures APDU commands, saves them, and replays them to facilitate differential testing, ensuring accurate validation and debugging of SIM interactions.",
@@ -38,14 +40,21 @@ record_parser.add_argument(
type=str,
help="File to save recorded APDU commands (e.g., 'commands.apdu').",
)
record_parser.add_argument(
"--isd-r",
type=str,
default="default",
choices=["default", "5ber"],
help="ISD-R to use for recording APDU commands (default: '%(default)s').",
)
record_parser.add_argument(
"-i",
"--bind-ip",
default="127.0.0.1",
help="Local IP address to which to bind the UDP port. (default: '127.0.0.1')",
help="Local IP address to which to bind the UDP port. (default: %(default)s)",
)
record_parser.add_argument(
"-p", "--bind-port", default=4729, help="Local UDP port. (default: '4729')"
"-p", "--bind-port", default=4729, help="Local UDP port. (default: %(default)s)"
)
record_parser.add_argument(
"-t",
@@ -71,14 +80,14 @@ replay_parser.add_argument(
"--pcsc-device",
type=int,
default=0,
help="Target PC/SC device to send APDU commands (default: 0).",
help="Target PC/SC device to send APDU commands (default: %(default)s).",
)
replay_parser.add_argument(
"--isd-r-aid",
"--target-isd-r",
type=str,
default="default",
choices=["default", "5ber"],
help="ISD-R AID to use for replaying APDU commands (default: 'default').",
help="Target ISD-R AID to use for replaying APDU commands (default: '%(default)s').",
)
if __name__ == "__main__":
@@ -86,13 +95,15 @@ if __name__ == "__main__":
# argcomplete.autocomplete(parser)
args = parser.parse_args()
log.setLevel(logging.DEBUG if args.verbose else logging.INFO)
if args.command == "record":
source = GsmtapApduSource(args.bind_ip, args.bind_port)
recorder = Recorder(source)
recorder = Recorder(source, args.isd_r)
recorder.record(args.output, args.timeout)
elif args.command == "replay":
replayer = Replayer(args.pcsc_device, args.isd_r_aid)
replayer = Replayer(args.pcsc_device, args.target_isd_r)
replayer.replay(args.input.name)
else:

79
resimulate/card.py Normal file
View File

@@ -0,0 +1,79 @@
from pySim.transport import LinkBase
from pySim.commands import SimCardCommands
from pySim.filesystem import CardModel, CardApplication
from pySim.cards import SimCardBase, UiccCardBase
from pySim.runtime import RuntimeState
from pySim.profile import CardProfile
from pySim.ts_102_221 import CardProfileUICC
from pySim.utils import all_subclasses
from pySim.exceptions import SwMatchError
from pySim.euicc import CardApplicationISDR
from resimulate.util.enums import ISDR_AID
from resimulate.util.logger import log
# Card initialization taken from pySim card_init function and modified for ReSIMulate
class Card:
card: SimCardBase | None
profile: CardProfile | None
generic_card: bool = False
def __init__(self, sim_link: LinkBase):
self.sim_link = sim_link
self.sim_card_commands = SimCardCommands(transport=sim_link)
def init_card(self, target_ids_r: ISDR_AID, timeout: int = 3) -> SimCardBase:
self.sim_link.wait_for_card(timeout=timeout)
self.card = UiccCardBase(self.sim_card_commands)
if not self.card.probe():
log.warning("Could not detect card type! Assuming a generic card type...")
self.card = SimCardBase(self.sim_card_commands)
self.generic_card = True
self.profile = CardProfile.pick(self.sim_card_commands)
if self.profile is None:
log.warning("Unsupported card type!")
return self.card
if self.generic_card and isinstance(self.profile, CardProfileUICC):
self.card._adm_chv_num = 0x0A
log.debug("Profile of type %s detected." % self.profile)
if isinstance(self.profile, CardProfileUICC):
for app_cls in all_subclasses(CardApplication):
# skip any intermediary sub-classes such as CardApplicationSD
if hasattr(app_cls, "_" + app_cls.__name__ + "__intermediate"):
continue
self.profile.add_application(app_cls())
# We have chosen SimCard() above, but we now know it actually is an UICC
# so it's safe to assume it supports USIM application (which we're adding above).
# IF we don't do this, we will have a SimCard but try USIM specific commands like
# the update_ust method (see https://osmocom.org/issues/6055)
if self.generic_card:
self.card = UiccCardBase(self.sim_card_commands)
runtime_state = RuntimeState(self.card, self.profile)
CardModel.apply_matching_models(self.sim_card_commands, runtime_state)
# inform the transport that we can do context-specific SW interpretation
self.sim_link.set_sw_interpreter(runtime_state)
# try to obtain the EID, if any
isd_r = runtime_state.mf.applications.get(target_ids_r.value, None)
if isd_r:
runtime_state.lchan[0].select_file(isd_r)
try:
runtime_state.identity["EID"] = CardApplicationISDR.get_eid(
self.sim_card_commands
)
except SwMatchError:
# has ISD-R but not a SGP.22/SGP.32 eUICC - maybe SGP.02?
pass
finally:
runtime_state.reset()
return self.card

View File

@@ -1,4 +1,3 @@
import pickle
import signal
from queue import Empty, Queue, ShutDown
from threading import Thread
@@ -11,33 +10,30 @@ from rich.live import Live
from rich.progress import BarColumn, Progress, TextColumn, TimeElapsedColumn
from rich.text import Text
from resimulate.recording import Recording
from resimulate.util.enums import ISDR_AID
from resimulate.util.logger import log
from resimulate.util.tracer import Tracer
class Recorder:
def __init__(self, source: ApduSource):
self.tracer = Tracer(source)
self.captured_apdus = []
def __init__(self, source: ApduSource, src_isd_r: str):
isd_r_aid = ISDR_AID.get_aid(src_isd_r)
self.tracer = Tracer(source, isd_r_aid=isd_r_aid)
self.src_isd_r_aid = src_isd_r
self.package_queue: Queue[tuple[Apdu, ApduCommand]] = Queue()
self.tracer_thread = Thread(
target=self.tracer.main, args=(self.package_queue,), daemon=True
)
self.recording = Recording()
signal.signal(signal.SIGINT, self.__signal_handler)
def __signal_handler(self, sig, frame):
log.debug("Received signal %s, shutting down capture.", sig)
self.package_queue.shutdown(immediate=True)
def record(self, output_path: str, timeout: int):
capture_progress = Progress(
TimeElapsedColumn(),
TextColumn("{task.completed}"),
TextColumn("[bold blue]{task.fields[packet_type]}"),
TextColumn("[bold green]{task.fields[packet_description]}"),
TextColumn("{task.fields[packet_code]}"),
)
overall_progress = Progress(
TimeElapsedColumn(),
@@ -55,12 +51,12 @@ class Recorder:
)
overall_task_id = overall_progress.add_task(
f"[bold red]{len(self.captured_apdus)} packets captured!",
f"[bold red]{len(self.recording.apdus)} packets captured!",
start=True,
total=None,
)
with Live(main_group) as live:
with Live(main_group):
self.tracer_thread.start()
while self.tracer_thread.is_alive():
@@ -73,18 +69,7 @@ class Recorder:
break
log.info("Captured %s %s", apdu_command._name, apdu)
""" capture_task_id = capture_progress.add_task(
"",
completed=len(self.captured_apdus),
packet_type=str(apdu_command._name) or "",
packet_description=str(apdu_command.path_str),
packet_code=str(apdu_command.col_sw),
) """
self.captured_apdus.append(apdu)
""" capture_progress.stop_task(capture_task_id) """
self.recording.apdus.append(apdu)
except TimeoutError:
log.debug("Timeout reached, stopping capture.")
break
@@ -100,7 +85,7 @@ class Recorder:
overall_progress.update(
overall_task_id,
description=f"[bold green]{len(self.captured_apdus)} packet(s) captured!",
description=f"[bold green]{len(self.recording.apdus)} packet(s) captured!",
)
overall_progress.update(
@@ -108,13 +93,10 @@ class Recorder:
description="[bold yellow]Saving captured APDU commands...",
)
log.debug("Saving captured APDU commands to %s", output_path)
with open(output_path, "wb") as f:
pickle.dump(self.captured_apdus, f)
self.recording.save_file(output_path)
overall_progress.update(
overall_task_id,
description="[bold green]Captured %s APDU packets!"
% len(self.captured_apdus),
% len(self.recording.apdus),
)

View File

@@ -1,31 +1,57 @@
import pickle
from contextlib import redirect_stdout
from osmocom.utils import b2h
from osmocom.utils import b2h, h2b, h2i, i2h
from pySim.apdu import Apdu
from pySim.app import init_card
from pySim.card_handler import CardHandler
from pySim.transport.pcsc import PcscSimLink
from pySim.ts_102_221 import CardProfileUICC
from rich.align import Align
from rich.console import Group
from rich.live import Live
from rich.progress import BarColumn, Progress, TextColumn, TimeElapsedColumn
from rich.text import Text
from resimulate.util.logger import LoggerWriter, log
from resimulate.util.pcsc_link import PcscLink
from resimulate.card import Card
from resimulate.recording import Recording
from resimulate.util.enums import ISDR_AID
from resimulate.util.logger import log
class Replayer:
def __init__(self, device: int, isd_r_aid: str):
def __init__(self, device: int, target_isd_r: str):
self.device = device
self.isd_r_aid = isd_r_aid
self.target_isd_r_aid = ISDR_AID.get_aid(target_isd_r)
def __get_remaining_bytes(self, link: PcscSimLink, bytes_to_receive: int, cla: int):
log.debug("Retrieving remaining bytes: %d", bytes_to_receive)
apdu = Apdu(i2h([cla]) + "C00000" + i2h([bytes_to_receive]))
return self.__send_apdu(link, apdu)
def __resend_with_modified_le(self, link: PcscSimLink, apdu: Apdu, le: int):
log.debug("Resending APDU with modified Le: %d", le)
modified_apdu = Apdu(b2h(apdu.cmd)[:-2] + i2h([le]))
return self.__send_apdu(link, modified_apdu)
def __send_apdu(self, link: PcscSimLink, apdu: Apdu):
if self.recording.src_isd_r_aid and self.target_isd_r_aid:
if b2h(apdu.cmd_data) == self.recording.src_isd_r_aid.value:
apdu.cmd_data = h2b(self.target_isd_r_aid.value)
log.debug(
"Sending APDU(%s) where CLA(%s), INS(%s), P1(%s), P2(%s), Lc(%s), DATA(%s), P3/Le(%s)",
b2h(apdu.cmd),
i2h([apdu.cla]),
i2h([apdu.ins]),
i2h([apdu.p1]),
i2h([apdu.p2]),
i2h([apdu.lc]),
b2h(apdu.cmd_data),
i2h([apdu.p3]),
)
data, resp = link.send_tpdu(b2h(apdu.cmd))
log.debug("Received Data: %s, SW: %s", data, resp)
return data, resp
def replay(self, input_path: str):
progress = Progress(
TimeElapsedColumn(),
BarColumn(),
TextColumn("{task.description}"),
TimeElapsedColumn(), BarColumn(), TextColumn("{task.description}")
)
main_group = Group(
@@ -43,56 +69,81 @@ class Replayer:
)
with Live(main_group):
with open(input_path, "rb") as f:
apdus: list[Apdu] = pickle.load(f)
self.recording = Recording.load_file(input_path)
progress.update(
progress_id, description="[bold green]Initializing PC/SC link..."
)
try:
pcsc_link = PcscSimLink() # PcscLink(device_index=self.device)
runtime_state, self.card = init_card(pcsc_link)
log.debug("PC/SC link initialized: %s", pcsc_link)
card = Card(pcsc_link).init_card(target_ids_r=self.target_isd_r_aid)
log.debug("Initialized card of type: %s", card.name)
except Exception as e:
log.error("Failed to initialize card: %s", e)
log.exception(e)
progress.update(
progress_id, description=":x: [bold red]Failed to initialize card."
)
return
try:
with pcsc_link as link:
for id, apdu in enumerate(apdus):
log.debug("Replaying APDUs...")
for idx, apdu in enumerate(self.recording.apdus, start=1):
progress.update(
progress_id,
total=len(apdus),
completed=id + 1,
description=f"Replaying APDU {id + 1} / {len(apdus)}",
total=len(self.recording.apdus),
completed=idx,
description=f"Replaying APDU {idx} / {len(self.recording.apdus)}",
)
cmd, resp = link.send_tpdu(b2h(apdu.cmd))
log.debug("APDU: %s, SW: %s", b2h(apdu.cmd), resp)
if resp != b2h(apdu.sw):
log.info(
"Received APDU %s response does not match the expected APDU: %s != %s",
b2h(apdu.cmd),
resp,
b2h(apdu.sw),
data, resp = self.__send_apdu(link, apdu)
if resp == b2h(apdu.sw):
continue
log.debug(
"Unexpected SW: %s (expected: %s)", resp, b2h(apdu.sw)
)
if resp.startswith("61"):
remaining_bytes = h2i(resp[2:])[0]
log.debug(
"Normal processing, %s bytes still available",
remaining_bytes,
)
self.__get_remaining_bytes(link, remaining_bytes, apdu.cla)
continue
elif resp.startswith("6c"):
le = h2i(resp[2:])[0]
log.warning(
"Wrong length, resending with modified Le %s", le
)
self.__resend_with_modified_le(link, apdu, le)
continue
error, description = CardProfileUICC().interpret_sw(resp)
if error:
log.error("%s: %s", error, description)
else:
log.error(
"Unexpected response: %s != %s", resp, b2h(apdu.sw)
)
except KeyboardInterrupt:
log.debug("Replay interrupted.")
progress.update(
progress_id, description=":x: [bold red]Replay interrupted."
)
return
except Exception as e:
log.error("Error during replay: %s", e)
log.exception(e)
progress.update(
progress_id, description=":x: [bold red]Error during replay."
)
return
log.debug("Replay finished.")
progress.update(
progress_id,
description=":white_check_mark: [bold green]Replay finished.",
)
else:
log.debug("Replay finished.")
progress.update(
progress_id,
description=":white_check_mark: [bold green]Replay finished.",
)

View File

@@ -1,2 +1,6 @@
class PcscError(Exception):
pass
class CardTypeException(Exception):
pass

36
resimulate/recording.py Normal file
View File

@@ -0,0 +1,36 @@
import pickle
from os.path import exists, isfile
from pySim.apdu import Apdu
from resimulate.util.enums import ISDR_AID
from resimulate.util.logger import log
class Recording:
apdus: list[Apdu]
src_isd_r_aid: ISDR_AID
def __init__(self, src_isd_r: str = "default"):
self.src_isd_r_aid = ISDR_AID.get_aid(src_isd_r)
self.apdus = []
@staticmethod
def load_file(file_path: str) -> "Recording":
if not exists(file_path) or not isfile(file_path):
raise FileNotFoundError(f"File {file_path} not found.")
with open(file_path, "rb") as f:
recording = pickle.load(f)
if not isinstance(recording, Recording):
raise TypeError(f"File {file_path} does not contain a Recording object.")
log.debug("Loaded %d APDUs from %s", len(recording.apdus), file_path)
return recording
def save_file(self, file_path: str) -> None:
log.debug("Saving captured APDU commands to %s", file_path)
with open(file_path, "wb") as f:
pickle.dump(self, f)

View File

@@ -1,6 +1,12 @@
import enum
from typing import Union
class ISDR_AID(str, enum.Enum):
_DEFAULT = "A0000005591010FFFFFFFF8900000100"
_5BER = "A0000005591010FFFFFFFF8900050500"
@staticmethod
def get_aid(aid_description: str) -> Union["ISDR_AID", None]:
mapping = {"default": ISDR_AID._DEFAULT, "5ber": ISDR_AID._5BER}
return mapping.get(aid_description)

View File

@@ -1,5 +1,5 @@
from osmocom.utils import Hexstr, h2i, i2h
from pySim.transport import LinkBaseTpdu, ProactiveHandler
from pySim.transport import LinkBaseTpdu
from pySim.utils import ResTuple
from smartcard import System
from smartcard.CardConnection import CardConnection
@@ -14,6 +14,7 @@ from smartcard.pcsc.PCSCReader import PCSCReader
from resimulate.exceptions import PcscError
from resimulate.util.logger import log
from resimulate.util.proactive_handler import ProactiveHandler
class PcscLink(LinkBaseTpdu):
@@ -32,7 +33,7 @@ class PcscLink(LinkBaseTpdu):
)
def __str__(self) -> str:
return "PCSC[%s]" % (self._reader)
return "PCSC[%s]" % (self.pcsc_device)
def __del__(self):
self.disconnect()
@@ -71,7 +72,11 @@ class PcscLink(LinkBaseTpdu):
log.debug("Connected to device %s", self.pcsc_device)
def disconnect(self):
self.card_connection.disconnect()
try:
self.card_connection.disconnect()
except AttributeError:
pass
log.debug("Disconnected from device %s", self.pcsc_device)
def _reset_card(self):

View File

@@ -0,0 +1,12 @@
from pySim.transport import ProactiveHandler as Proact
from pySim.transport import ProactiveCommand
from resimulate.util.logger import log
class ProactiveHandler(Proact):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def receive_fetch(self, cmd: ProactiveCommand):
log.debug("Handling proactive command:", cmd)

View File

@@ -18,6 +18,7 @@ from pySim.ts_31_103 import CardApplicationISIM
from pySim.ts_102_221 import CardProfileUICC
from resimulate.util.dummy_sim_link import DummySimLink
from resimulate.util.enums import ISDR_AID
from resimulate.util.logger import log
APDU_COMMANDS = (
@@ -27,13 +28,13 @@ APDU_COMMANDS = (
# Taken from the pySim project and modified for the ReSIMulate project
class Tracer:
def __init__(self, source: ApduSource):
def __init__(self, source: ApduSource, isd_r_aid: ISDR_AID):
# we assume a generic UICC profile; as all APDUs return 9000 in DummySimLink above,
# all CardProfileAddon (including SIM) will probe successful.
profile = CardProfileUICC()
profile.add_application(CardApplicationUSIM())
profile.add_application(CardApplicationISIM())
profile.add_application(CardApplicationISDR())
profile.add_application(CardApplicationISDR(aid=isd_r_aid.value))
profile.add_application(CardApplicationECASD())
profile.add_application(CardApplicationARAM())
profile.add_application(CardApplicationISD())