diff --git a/.vscode/launch.json b/.vscode/launch.json index 6a1e877..a32e49d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -10,7 +10,13 @@ "request": "launch", "program": "${workspaceFolder}/resimulate.py", "console": "integratedTerminal", - "args": ["record", "-o", "test.apdu"], + "args": [ + "record", + "-o", + "${input:promptFilename}", + "--isd-r", + "${input:pickISDR}" + ], "justMyCode": false }, { @@ -19,8 +25,40 @@ "request": "launch", "program": "${workspaceFolder}/resimulate.py", "console": "integratedTerminal", - "args": ["replay", "-i", "test.pkl"], + "args": [ + "replay", + "-i", + "${input:pickFile}", + "--src-isd-r", + "${input:pickISDR}", + "--target-isd-r", + "${input:pickISDR}" + ], "justMyCode": false } + ], + "inputs": [ + { + "id": "pickFile", + "type": "command", + "command": "extension.commandvariable.file.pickFile", + "args": { + "description": "Path to recording file.", + "fromFolder": { "fixed": "${workspaceFolder}" } + } + }, + { + "type": "promptString", + "id": "promptFilename", + "description": "Name of the recording file." + }, + + { + "type": "pickString", + "id": "pickISDR", + "description": "Pick the ISD-R", + "options": ["default", "5ber"], + "default": "default" + } ] } diff --git a/resimulate.py b/resimulate.py index 44688b6..ccb284e 100755 --- a/resimulate.py +++ b/resimulate.py @@ -2,6 +2,7 @@ # PYTHON_ARGCOMPLETE_OK import argparse +import logging import argcomplete from pySim.apdu_source.gsmtap import GsmtapApduSource @@ -9,6 +10,7 @@ from rich_argparse import RichHelpFormatter from resimulate.commands.record import Recorder from resimulate.commands.replay import Replayer +from resimulate.util.logger import log parser = argparse.ArgumentParser( description="ReSIMulate is a terminal application built for eSIM and SIM-specific APDU analysis. It captures APDU commands, saves them, and replays them to facilitate differential testing, ensuring accurate validation and debugging of SIM interactions.", @@ -38,14 +40,21 @@ record_parser.add_argument( type=str, help="File to save recorded APDU commands (e.g., 'commands.apdu').", ) +record_parser.add_argument( + "--isd-r", + type=str, + default="default", + choices=["default", "5ber"], + help="ISD-R to use for recording APDU commands (default: '%(default)s').", +) record_parser.add_argument( "-i", "--bind-ip", default="127.0.0.1", - help="Local IP address to which to bind the UDP port. (default: '127.0.0.1')", + help="Local IP address to which to bind the UDP port. (default: %(default)s)", ) record_parser.add_argument( - "-p", "--bind-port", default=4729, help="Local UDP port. (default: '4729')" + "-p", "--bind-port", default=4729, help="Local UDP port. (default: %(default)s)" ) record_parser.add_argument( "-t", @@ -71,14 +80,14 @@ replay_parser.add_argument( "--pcsc-device", type=int, default=0, - help="Target PC/SC device to send APDU commands (default: 0).", + help="Target PC/SC device to send APDU commands (default: %(default)s).", ) replay_parser.add_argument( - "--isd-r-aid", + "--target-isd-r", type=str, default="default", choices=["default", "5ber"], - help="ISD-R AID to use for replaying APDU commands (default: 'default').", + help="Target ISD-R AID to use for replaying APDU commands (default: '%(default)s').", ) if __name__ == "__main__": @@ -86,13 +95,15 @@ if __name__ == "__main__": # argcomplete.autocomplete(parser) args = parser.parse_args() + log.setLevel(logging.DEBUG if args.verbose else logging.INFO) + if args.command == "record": source = GsmtapApduSource(args.bind_ip, args.bind_port) - recorder = Recorder(source) + recorder = Recorder(source, args.isd_r) recorder.record(args.output, args.timeout) elif args.command == "replay": - replayer = Replayer(args.pcsc_device, args.isd_r_aid) + replayer = Replayer(args.pcsc_device, args.target_isd_r) replayer.replay(args.input.name) else: diff --git a/resimulate/card.py b/resimulate/card.py new file mode 100644 index 0000000..be73bee --- /dev/null +++ b/resimulate/card.py @@ -0,0 +1,79 @@ +from pySim.transport import LinkBase +from pySim.commands import SimCardCommands +from pySim.filesystem import CardModel, CardApplication +from pySim.cards import SimCardBase, UiccCardBase +from pySim.runtime import RuntimeState +from pySim.profile import CardProfile +from pySim.ts_102_221 import CardProfileUICC +from pySim.utils import all_subclasses +from pySim.exceptions import SwMatchError +from pySim.euicc import CardApplicationISDR + +from resimulate.util.enums import ISDR_AID +from resimulate.util.logger import log + + +# Card initialization taken from pySim card_init function and modified for ReSIMulate +class Card: + card: SimCardBase | None + profile: CardProfile | None + generic_card: bool = False + + def __init__(self, sim_link: LinkBase): + self.sim_link = sim_link + self.sim_card_commands = SimCardCommands(transport=sim_link) + + def init_card(self, target_ids_r: ISDR_AID, timeout: int = 3) -> SimCardBase: + self.sim_link.wait_for_card(timeout=timeout) + + self.card = UiccCardBase(self.sim_card_commands) + if not self.card.probe(): + log.warning("Could not detect card type! Assuming a generic card type...") + self.card = SimCardBase(self.sim_card_commands) + self.generic_card = True + + self.profile = CardProfile.pick(self.sim_card_commands) + if self.profile is None: + log.warning("Unsupported card type!") + return self.card + + if self.generic_card and isinstance(self.profile, CardProfileUICC): + self.card._adm_chv_num = 0x0A + + log.debug("Profile of type %s detected." % self.profile) + + if isinstance(self.profile, CardProfileUICC): + for app_cls in all_subclasses(CardApplication): + # skip any intermediary sub-classes such as CardApplicationSD + if hasattr(app_cls, "_" + app_cls.__name__ + "__intermediate"): + continue + self.profile.add_application(app_cls()) + # We have chosen SimCard() above, but we now know it actually is an UICC + # so it's safe to assume it supports USIM application (which we're adding above). + # IF we don't do this, we will have a SimCard but try USIM specific commands like + # the update_ust method (see https://osmocom.org/issues/6055) + if self.generic_card: + self.card = UiccCardBase(self.sim_card_commands) + + runtime_state = RuntimeState(self.card, self.profile) + + CardModel.apply_matching_models(self.sim_card_commands, runtime_state) + + # inform the transport that we can do context-specific SW interpretation + self.sim_link.set_sw_interpreter(runtime_state) + + # try to obtain the EID, if any + isd_r = runtime_state.mf.applications.get(target_ids_r.value, None) + if isd_r: + runtime_state.lchan[0].select_file(isd_r) + try: + runtime_state.identity["EID"] = CardApplicationISDR.get_eid( + self.sim_card_commands + ) + except SwMatchError: + # has ISD-R but not a SGP.22/SGP.32 eUICC - maybe SGP.02? + pass + finally: + runtime_state.reset() + + return self.card diff --git a/resimulate/commands/record.py b/resimulate/commands/record.py index 0d48a98..6c726b8 100644 --- a/resimulate/commands/record.py +++ b/resimulate/commands/record.py @@ -1,4 +1,3 @@ -import pickle import signal from queue import Empty, Queue, ShutDown from threading import Thread @@ -11,33 +10,30 @@ from rich.live import Live from rich.progress import BarColumn, Progress, TextColumn, TimeElapsedColumn from rich.text import Text +from resimulate.recording import Recording +from resimulate.util.enums import ISDR_AID from resimulate.util.logger import log from resimulate.util.tracer import Tracer class Recorder: - def __init__(self, source: ApduSource): - self.tracer = Tracer(source) - - self.captured_apdus = [] + def __init__(self, source: ApduSource, src_isd_r: str): + isd_r_aid = ISDR_AID.get_aid(src_isd_r) + self.tracer = Tracer(source, isd_r_aid=isd_r_aid) + self.src_isd_r_aid = src_isd_r self.package_queue: Queue[tuple[Apdu, ApduCommand]] = Queue() self.tracer_thread = Thread( target=self.tracer.main, args=(self.package_queue,), daemon=True ) + self.recording = Recording() signal.signal(signal.SIGINT, self.__signal_handler) def __signal_handler(self, sig, frame): + log.debug("Received signal %s, shutting down capture.", sig) self.package_queue.shutdown(immediate=True) def record(self, output_path: str, timeout: int): - capture_progress = Progress( - TimeElapsedColumn(), - TextColumn("{task.completed}"), - TextColumn("[bold blue]{task.fields[packet_type]}"), - TextColumn("[bold green]{task.fields[packet_description]}"), - TextColumn("{task.fields[packet_code]}"), - ) overall_progress = Progress( TimeElapsedColumn(), @@ -55,12 +51,12 @@ class Recorder: ) overall_task_id = overall_progress.add_task( - f"[bold red]{len(self.captured_apdus)} packets captured!", + f"[bold red]{len(self.recording.apdus)} packets captured!", start=True, total=None, ) - with Live(main_group) as live: + with Live(main_group): self.tracer_thread.start() while self.tracer_thread.is_alive(): @@ -73,18 +69,7 @@ class Recorder: break log.info("Captured %s %s", apdu_command._name, apdu) - - """ capture_task_id = capture_progress.add_task( - "", - completed=len(self.captured_apdus), - packet_type=str(apdu_command._name) or "", - packet_description=str(apdu_command.path_str), - packet_code=str(apdu_command.col_sw), - ) """ - - self.captured_apdus.append(apdu) - - """ capture_progress.stop_task(capture_task_id) """ + self.recording.apdus.append(apdu) except TimeoutError: log.debug("Timeout reached, stopping capture.") break @@ -100,7 +85,7 @@ class Recorder: overall_progress.update( overall_task_id, - description=f"[bold green]{len(self.captured_apdus)} packet(s) captured!", + description=f"[bold green]{len(self.recording.apdus)} packet(s) captured!", ) overall_progress.update( @@ -108,13 +93,10 @@ class Recorder: description="[bold yellow]Saving captured APDU commands...", ) - log.debug("Saving captured APDU commands to %s", output_path) - - with open(output_path, "wb") as f: - pickle.dump(self.captured_apdus, f) + self.recording.save_file(output_path) overall_progress.update( overall_task_id, description="[bold green]Captured %s APDU packets!" - % len(self.captured_apdus), + % len(self.recording.apdus), ) diff --git a/resimulate/commands/replay.py b/resimulate/commands/replay.py index 606423c..d2dcbce 100644 --- a/resimulate/commands/replay.py +++ b/resimulate/commands/replay.py @@ -1,31 +1,57 @@ -import pickle -from contextlib import redirect_stdout - -from osmocom.utils import b2h +from osmocom.utils import b2h, h2b, h2i, i2h from pySim.apdu import Apdu -from pySim.app import init_card -from pySim.card_handler import CardHandler from pySim.transport.pcsc import PcscSimLink +from pySim.ts_102_221 import CardProfileUICC from rich.align import Align from rich.console import Group from rich.live import Live from rich.progress import BarColumn, Progress, TextColumn, TimeElapsedColumn from rich.text import Text -from resimulate.util.logger import LoggerWriter, log -from resimulate.util.pcsc_link import PcscLink +from resimulate.card import Card +from resimulate.recording import Recording +from resimulate.util.enums import ISDR_AID +from resimulate.util.logger import log class Replayer: - def __init__(self, device: int, isd_r_aid: str): + def __init__(self, device: int, target_isd_r: str): self.device = device - self.isd_r_aid = isd_r_aid + self.target_isd_r_aid = ISDR_AID.get_aid(target_isd_r) + + def __get_remaining_bytes(self, link: PcscSimLink, bytes_to_receive: int, cla: int): + log.debug("Retrieving remaining bytes: %d", bytes_to_receive) + apdu = Apdu(i2h([cla]) + "C00000" + i2h([bytes_to_receive])) + return self.__send_apdu(link, apdu) + + def __resend_with_modified_le(self, link: PcscSimLink, apdu: Apdu, le: int): + log.debug("Resending APDU with modified Le: %d", le) + modified_apdu = Apdu(b2h(apdu.cmd)[:-2] + i2h([le])) + return self.__send_apdu(link, modified_apdu) + + def __send_apdu(self, link: PcscSimLink, apdu: Apdu): + if self.recording.src_isd_r_aid and self.target_isd_r_aid: + if b2h(apdu.cmd_data) == self.recording.src_isd_r_aid.value: + apdu.cmd_data = h2b(self.target_isd_r_aid.value) + + log.debug( + "Sending APDU(%s) where CLA(%s), INS(%s), P1(%s), P2(%s), Lc(%s), DATA(%s), P3/Le(%s)", + b2h(apdu.cmd), + i2h([apdu.cla]), + i2h([apdu.ins]), + i2h([apdu.p1]), + i2h([apdu.p2]), + i2h([apdu.lc]), + b2h(apdu.cmd_data), + i2h([apdu.p3]), + ) + data, resp = link.send_tpdu(b2h(apdu.cmd)) + log.debug("Received Data: %s, SW: %s", data, resp) + return data, resp def replay(self, input_path: str): progress = Progress( - TimeElapsedColumn(), - BarColumn(), - TextColumn("{task.description}"), + TimeElapsedColumn(), BarColumn(), TextColumn("{task.description}") ) main_group = Group( @@ -43,56 +69,81 @@ class Replayer: ) with Live(main_group): - with open(input_path, "rb") as f: - apdus: list[Apdu] = pickle.load(f) - + self.recording = Recording.load_file(input_path) progress.update( progress_id, description="[bold green]Initializing PC/SC link..." ) try: pcsc_link = PcscSimLink() # PcscLink(device_index=self.device) - runtime_state, self.card = init_card(pcsc_link) + log.debug("PC/SC link initialized: %s", pcsc_link) + card = Card(pcsc_link).init_card(target_ids_r=self.target_isd_r_aid) + log.debug("Initialized card of type: %s", card.name) except Exception as e: log.error("Failed to initialize card: %s", e) + log.exception(e) progress.update( progress_id, description=":x: [bold red]Failed to initialize card." ) return + try: with pcsc_link as link: - for id, apdu in enumerate(apdus): + log.debug("Replaying APDUs...") + for idx, apdu in enumerate(self.recording.apdus, start=1): progress.update( progress_id, - total=len(apdus), - completed=id + 1, - description=f"Replaying APDU {id + 1} / {len(apdus)}", + total=len(self.recording.apdus), + completed=idx, + description=f"Replaying APDU {idx} / {len(self.recording.apdus)}", ) - cmd, resp = link.send_tpdu(b2h(apdu.cmd)) - log.debug("APDU: %s, SW: %s", b2h(apdu.cmd), resp) - if resp != b2h(apdu.sw): - log.info( - "Received APDU %s response does not match the expected APDU: %s != %s", - b2h(apdu.cmd), - resp, - b2h(apdu.sw), + data, resp = self.__send_apdu(link, apdu) + + if resp == b2h(apdu.sw): + continue + + log.debug( + "Unexpected SW: %s (expected: %s)", resp, b2h(apdu.sw) + ) + + if resp.startswith("61"): + remaining_bytes = h2i(resp[2:])[0] + log.debug( + "Normal processing, %s bytes still available", + remaining_bytes, ) + self.__get_remaining_bytes(link, remaining_bytes, apdu.cla) + continue + elif resp.startswith("6c"): + le = h2i(resp[2:])[0] + log.warning( + "Wrong length, resending with modified Le %s", le + ) + self.__resend_with_modified_le(link, apdu, le) + continue + + error, description = CardProfileUICC().interpret_sw(resp) + if error: + log.error("%s: %s", error, description) + else: + log.error( + "Unexpected response: %s != %s", resp, b2h(apdu.sw) + ) + except KeyboardInterrupt: log.debug("Replay interrupted.") progress.update( progress_id, description=":x: [bold red]Replay interrupted." ) - return except Exception as e: - log.error("Error during replay: %s", e) + log.exception(e) progress.update( progress_id, description=":x: [bold red]Error during replay." ) - return - - log.debug("Replay finished.") - progress.update( - progress_id, - description=":white_check_mark: [bold green]Replay finished.", - ) + else: + log.debug("Replay finished.") + progress.update( + progress_id, + description=":white_check_mark: [bold green]Replay finished.", + ) diff --git a/resimulate/exceptions.py b/resimulate/exceptions.py index e9c087f..16b9649 100644 --- a/resimulate/exceptions.py +++ b/resimulate/exceptions.py @@ -1,2 +1,6 @@ class PcscError(Exception): pass + + +class CardTypeException(Exception): + pass diff --git a/resimulate/recording.py b/resimulate/recording.py new file mode 100644 index 0000000..06d415f --- /dev/null +++ b/resimulate/recording.py @@ -0,0 +1,36 @@ +import pickle +from os.path import exists, isfile + +from pySim.apdu import Apdu + +from resimulate.util.enums import ISDR_AID +from resimulate.util.logger import log + + +class Recording: + apdus: list[Apdu] + src_isd_r_aid: ISDR_AID + + def __init__(self, src_isd_r: str = "default"): + self.src_isd_r_aid = ISDR_AID.get_aid(src_isd_r) + self.apdus = [] + + @staticmethod + def load_file(file_path: str) -> "Recording": + if not exists(file_path) or not isfile(file_path): + raise FileNotFoundError(f"File {file_path} not found.") + + with open(file_path, "rb") as f: + recording = pickle.load(f) + + if not isinstance(recording, Recording): + raise TypeError(f"File {file_path} does not contain a Recording object.") + + log.debug("Loaded %d APDUs from %s", len(recording.apdus), file_path) + + return recording + + def save_file(self, file_path: str) -> None: + log.debug("Saving captured APDU commands to %s", file_path) + with open(file_path, "wb") as f: + pickle.dump(self, f) diff --git a/resimulate/util/enums.py b/resimulate/util/enums.py index a0177c0..486ce9e 100644 --- a/resimulate/util/enums.py +++ b/resimulate/util/enums.py @@ -1,6 +1,12 @@ import enum +from typing import Union class ISDR_AID(str, enum.Enum): _DEFAULT = "A0000005591010FFFFFFFF8900000100" _5BER = "A0000005591010FFFFFFFF8900050500" + + @staticmethod + def get_aid(aid_description: str) -> Union["ISDR_AID", None]: + mapping = {"default": ISDR_AID._DEFAULT, "5ber": ISDR_AID._5BER} + return mapping.get(aid_description) diff --git a/resimulate/util/pcsc_link.py b/resimulate/util/pcsc_link.py index 1511a3c..100b659 100644 --- a/resimulate/util/pcsc_link.py +++ b/resimulate/util/pcsc_link.py @@ -1,5 +1,5 @@ from osmocom.utils import Hexstr, h2i, i2h -from pySim.transport import LinkBaseTpdu, ProactiveHandler +from pySim.transport import LinkBaseTpdu from pySim.utils import ResTuple from smartcard import System from smartcard.CardConnection import CardConnection @@ -14,6 +14,7 @@ from smartcard.pcsc.PCSCReader import PCSCReader from resimulate.exceptions import PcscError from resimulate.util.logger import log +from resimulate.util.proactive_handler import ProactiveHandler class PcscLink(LinkBaseTpdu): @@ -32,7 +33,7 @@ class PcscLink(LinkBaseTpdu): ) def __str__(self) -> str: - return "PCSC[%s]" % (self._reader) + return "PCSC[%s]" % (self.pcsc_device) def __del__(self): self.disconnect() @@ -71,7 +72,11 @@ class PcscLink(LinkBaseTpdu): log.debug("Connected to device %s", self.pcsc_device) def disconnect(self): - self.card_connection.disconnect() + try: + self.card_connection.disconnect() + except AttributeError: + pass + log.debug("Disconnected from device %s", self.pcsc_device) def _reset_card(self): diff --git a/resimulate/util/proactive_handler.py b/resimulate/util/proactive_handler.py new file mode 100644 index 0000000..bc281a0 --- /dev/null +++ b/resimulate/util/proactive_handler.py @@ -0,0 +1,12 @@ +from pySim.transport import ProactiveHandler as Proact +from pySim.transport import ProactiveCommand + +from resimulate.util.logger import log + + +class ProactiveHandler(Proact): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def receive_fetch(self, cmd: ProactiveCommand): + log.debug("Handling proactive command:", cmd) diff --git a/resimulate/util/tracer.py b/resimulate/util/tracer.py index afeee27..18319ae 100644 --- a/resimulate/util/tracer.py +++ b/resimulate/util/tracer.py @@ -18,6 +18,7 @@ from pySim.ts_31_103 import CardApplicationISIM from pySim.ts_102_221 import CardProfileUICC from resimulate.util.dummy_sim_link import DummySimLink +from resimulate.util.enums import ISDR_AID from resimulate.util.logger import log APDU_COMMANDS = ( @@ -27,13 +28,13 @@ APDU_COMMANDS = ( # Taken from the pySim project and modified for the ReSIMulate project class Tracer: - def __init__(self, source: ApduSource): + def __init__(self, source: ApduSource, isd_r_aid: ISDR_AID): # we assume a generic UICC profile; as all APDUs return 9000 in DummySimLink above, # all CardProfileAddon (including SIM) will probe successful. profile = CardProfileUICC() profile.add_application(CardApplicationUSIM()) profile.add_application(CardApplicationISIM()) - profile.add_application(CardApplicationISDR()) + profile.add_application(CardApplicationISDR(aid=isd_r_aid.value)) profile.add_application(CardApplicationECASD()) profile.add_application(CardApplicationARAM()) profile.add_application(CardApplicationISD())