feat: better replay
This commit is contained in:
13
.vscode/launch.json
vendored
13
.vscode/launch.json
vendored
@@ -8,9 +8,18 @@
|
|||||||
"name": "reSIMulate: record",
|
"name": "reSIMulate: record",
|
||||||
"type": "debugpy",
|
"type": "debugpy",
|
||||||
"request": "launch",
|
"request": "launch",
|
||||||
"program": "${workspaceFolder}/resimulate/resimulate.py",
|
"program": "${workspaceFolder}/resimulate.py",
|
||||||
"console": "integratedTerminal",
|
"console": "integratedTerminal",
|
||||||
"args": ["record", "-o test.pkl"],
|
"args": ["record", "-o", "test.apdu"],
|
||||||
|
"justMyCode": false
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "reSIMulate: replay",
|
||||||
|
"type": "debugpy",
|
||||||
|
"request": "launch",
|
||||||
|
"program": "${workspaceFolder}/resimulate.py",
|
||||||
|
"console": "integratedTerminal",
|
||||||
|
"args": ["replay", "-i", "test.pkl"],
|
||||||
"justMyCode": false
|
"justMyCode": false
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -4,11 +4,12 @@
|
|||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
import argcomplete
|
import argcomplete
|
||||||
from commands.record import Recorder
|
|
||||||
from commands.replay import Replayer
|
|
||||||
from pySim.apdu_source.gsmtap import GsmtapApduSource
|
from pySim.apdu_source.gsmtap import GsmtapApduSource
|
||||||
from rich_argparse import RichHelpFormatter
|
from rich_argparse import RichHelpFormatter
|
||||||
|
|
||||||
|
from resimulate.commands.record import Recorder
|
||||||
|
from resimulate.commands.replay import Replayer
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description="ReSIMulate is a terminal application built for eSIM and SIM-specific APDU analysis. It captures APDU commands, saves them, and replays them to facilitate differential testing, ensuring accurate validation and debugging of SIM interactions.",
|
description="ReSIMulate is a terminal application built for eSIM and SIM-specific APDU analysis. It captures APDU commands, saves them, and replays them to facilitate differential testing, ensuring accurate validation and debugging of SIM interactions.",
|
||||||
formatter_class=RichHelpFormatter,
|
formatter_class=RichHelpFormatter,
|
||||||
@@ -62,7 +63,7 @@ replay_parser.add_argument(
|
|||||||
"-i",
|
"-i",
|
||||||
"--input",
|
"--input",
|
||||||
required=True,
|
required=True,
|
||||||
type=argparse.FileType("r"),
|
type=argparse.FileType("rb"),
|
||||||
help="File containing APDU commands to replay (e.g., 'commands.apdu').",
|
help="File containing APDU commands to replay (e.g., 'commands.apdu').",
|
||||||
)
|
)
|
||||||
replay_parser.add_argument(
|
replay_parser.add_argument(
|
||||||
@@ -70,7 +71,14 @@ replay_parser.add_argument(
|
|||||||
"--pcsc-device",
|
"--pcsc-device",
|
||||||
type=int,
|
type=int,
|
||||||
default=0,
|
default=0,
|
||||||
help="Target simtrace device to send APDU commands (default: 0).",
|
help="Target PC/SC device to send APDU commands (default: 0).",
|
||||||
|
)
|
||||||
|
replay_parser.add_argument(
|
||||||
|
"--isd-r-aid",
|
||||||
|
type=str,
|
||||||
|
default="default",
|
||||||
|
choices=["default", "5ber"],
|
||||||
|
help="ISD-R AID to use for replaying APDU commands (default: 'default').",
|
||||||
)
|
)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
@@ -84,7 +92,7 @@ if __name__ == "__main__":
|
|||||||
recorder.record(args.output, args.timeout)
|
recorder.record(args.output, args.timeout)
|
||||||
|
|
||||||
elif args.command == "replay":
|
elif args.command == "replay":
|
||||||
replayer = Replayer(args.pcsc_device)
|
replayer = Replayer(args.pcsc_device, args.isd_r_aid)
|
||||||
replayer.replay(args.input.name)
|
replayer.replay(args.input.name)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
@@ -10,8 +10,9 @@ from rich.console import Group
|
|||||||
from rich.live import Live
|
from rich.live import Live
|
||||||
from rich.progress import BarColumn, Progress, TextColumn, TimeElapsedColumn
|
from rich.progress import BarColumn, Progress, TextColumn, TimeElapsedColumn
|
||||||
from rich.text import Text
|
from rich.text import Text
|
||||||
from util.logger import log
|
|
||||||
from util.tracer import Tracer
|
from resimulate.util.logger import log
|
||||||
|
from resimulate.util.tracer import Tracer
|
||||||
|
|
||||||
|
|
||||||
class Recorder:
|
class Recorder:
|
||||||
|
|||||||
@@ -1,23 +1,25 @@
|
|||||||
import pickle
|
import pickle
|
||||||
|
from contextlib import redirect_stdout
|
||||||
|
|
||||||
from osmocom.utils import b2h
|
from osmocom.utils import b2h
|
||||||
from pySim.apdu import Apdu
|
from pySim.apdu import Apdu
|
||||||
from pySim.app import init_card
|
from pySim.app import init_card
|
||||||
from pySim.card_handler import CardHandler
|
from pySim.card_handler import CardHandler
|
||||||
|
from pySim.transport.pcsc import PcscSimLink
|
||||||
from rich.align import Align
|
from rich.align import Align
|
||||||
from rich.console import Group
|
from rich.console import Group
|
||||||
from rich.live import Live
|
from rich.live import Live
|
||||||
from rich.progress import BarColumn, Progress, TextColumn, TimeElapsedColumn
|
from rich.progress import BarColumn, Progress, TextColumn, TimeElapsedColumn
|
||||||
from rich.text import Text
|
from rich.text import Text
|
||||||
from util.logger import log
|
|
||||||
from util.pcsc_link import PcscLink
|
from resimulate.util.logger import LoggerWriter, log
|
||||||
|
from resimulate.util.pcsc_link import PcscLink
|
||||||
|
|
||||||
|
|
||||||
class Replayer:
|
class Replayer:
|
||||||
def __init__(self, device: int):
|
def __init__(self, device: int, isd_r_aid: str):
|
||||||
self.pcsc_link = PcscLink(device=device)
|
self.device = device
|
||||||
self.card_handler = CardHandler(self.pcsc_link)
|
self.isd_r_aid = isd_r_aid
|
||||||
self.runtime_state, self.card = init_card(self.card_handler)
|
|
||||||
|
|
||||||
def replay(self, input_path: str):
|
def replay(self, input_path: str):
|
||||||
progress = Progress(
|
progress = Progress(
|
||||||
@@ -37,17 +39,60 @@ class Replayer:
|
|||||||
)
|
)
|
||||||
|
|
||||||
progress_id = progress.add_task(
|
progress_id = progress.add_task(
|
||||||
f"[bold red]Loading APDUs from {input_path}...", start=True, total=None
|
f"[bold green]Loading APDUs from {input_path}...", start=True, total=None
|
||||||
)
|
)
|
||||||
|
|
||||||
with Live(main_group):
|
with Live(main_group):
|
||||||
with open(input_path, "rb") as f:
|
with open(input_path, "rb") as f:
|
||||||
apdus: list[Apdu] = pickle.load(f)
|
apdus: list[Apdu] = pickle.load(f)
|
||||||
|
|
||||||
with self.pcsc_link as link:
|
progress.update(
|
||||||
for id, apdu in enumerate(apdus):
|
progress_id, description="[bold green]Initializing PC/SC link..."
|
||||||
progress.update(
|
)
|
||||||
progress_id, description=f"Replaying APDU {id} / {len(apdus)}"
|
|
||||||
)
|
try:
|
||||||
data, sw = link.send_tpdu(b2h(apdu))
|
pcsc_link = PcscSimLink() # PcscLink(device_index=self.device)
|
||||||
log.debug("APDU: %s, SW: %s", b2h(apdu), sw)
|
runtime_state, self.card = init_card(pcsc_link)
|
||||||
|
except Exception as e:
|
||||||
|
log.error("Failed to initialize card: %s", e)
|
||||||
|
progress.update(
|
||||||
|
progress_id, description=":x: [bold red]Failed to initialize card."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
with pcsc_link as link:
|
||||||
|
for id, apdu in enumerate(apdus):
|
||||||
|
progress.update(
|
||||||
|
progress_id,
|
||||||
|
total=len(apdus),
|
||||||
|
completed=id + 1,
|
||||||
|
description=f"Replaying APDU {id + 1} / {len(apdus)}",
|
||||||
|
)
|
||||||
|
cmd, resp = link.send_tpdu(b2h(apdu.cmd))
|
||||||
|
log.debug("APDU: %s, SW: %s", b2h(apdu.cmd), resp)
|
||||||
|
|
||||||
|
if resp != b2h(apdu.sw):
|
||||||
|
log.info(
|
||||||
|
"Received APDU %s response does not match the expected APDU: %s != %s",
|
||||||
|
b2h(apdu.cmd),
|
||||||
|
resp,
|
||||||
|
b2h(apdu.sw),
|
||||||
|
)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
log.debug("Replay interrupted.")
|
||||||
|
progress.update(
|
||||||
|
progress_id, description=":x: [bold red]Replay interrupted."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
log.error("Error during replay: %s", e)
|
||||||
|
progress.update(
|
||||||
|
progress_id, description=":x: [bold red]Error during replay."
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
log.debug("Replay finished.")
|
||||||
|
progress.update(
|
||||||
|
progress_id,
|
||||||
|
description=":white_check_mark: [bold green]Replay finished.",
|
||||||
|
)
|
||||||
|
|||||||
6
resimulate/util/enums.py
Normal file
6
resimulate/util/enums.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
import enum
|
||||||
|
|
||||||
|
|
||||||
|
class ISDR_AID(str, enum.Enum):
|
||||||
|
_DEFAULT = "A0000005591010FFFFFFFF8900000100"
|
||||||
|
_5BER = "A0000005591010FFFFFFFF8900050500"
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import sys
|
||||||
|
|
||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
from rich.logging import RichHandler
|
from rich.logging import RichHandler
|
||||||
from util.apdu_highlighter import ApduHighlighter
|
|
||||||
|
from resimulate.util.apdu_highlighter import ApduHighlighter
|
||||||
|
|
||||||
|
|
||||||
class RichLogger:
|
class RichLogger:
|
||||||
@@ -31,4 +33,16 @@ class RichLogger:
|
|||||||
return self.logger
|
return self.logger
|
||||||
|
|
||||||
|
|
||||||
|
class LoggerWriter:
|
||||||
|
def __init__(self, level):
|
||||||
|
self.level = level
|
||||||
|
|
||||||
|
def write(self, message, *args, **kwargs):
|
||||||
|
if message != "\n":
|
||||||
|
self.level(message)
|
||||||
|
|
||||||
|
def flush(self):
|
||||||
|
self.level(sys.stderr)
|
||||||
|
|
||||||
|
|
||||||
log = RichLogger().get_logger()
|
log = RichLogger().get_logger()
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
from exceptions import PcscError
|
|
||||||
from osmocom.utils import Hexstr, h2i, i2h
|
from osmocom.utils import Hexstr, h2i, i2h
|
||||||
from pySim.transport import LinkBaseTpdu
|
from pySim.transport import LinkBaseTpdu, ProactiveHandler
|
||||||
from pySim.utils import ResTuple
|
from pySim.utils import ResTuple
|
||||||
from smartcard import System
|
from smartcard import System
|
||||||
from smartcard.CardConnection import CardConnection
|
from smartcard.CardConnection import CardConnection
|
||||||
@@ -11,20 +10,23 @@ from smartcard.Exceptions import (
|
|||||||
NoCardException,
|
NoCardException,
|
||||||
)
|
)
|
||||||
from smartcard.ExclusiveConnectCardConnection import ExclusiveConnectCardConnection
|
from smartcard.ExclusiveConnectCardConnection import ExclusiveConnectCardConnection
|
||||||
from util.logger import log
|
from smartcard.pcsc.PCSCReader import PCSCReader
|
||||||
|
|
||||||
|
from resimulate.exceptions import PcscError
|
||||||
|
from resimulate.util.logger import log
|
||||||
|
|
||||||
|
|
||||||
class PcscLink(LinkBaseTpdu):
|
class PcscLink(LinkBaseTpdu):
|
||||||
protocol = CardConnection.T0_protocol
|
protocol = CardConnection.T0_protocol
|
||||||
|
|
||||||
def __init__(self, device: int, **kwargs):
|
def __init__(self, device_index: int, **kwargs):
|
||||||
super().__init__(**kwargs)
|
super().__init__(proactive_handler=ProactiveHandler(), **kwargs)
|
||||||
|
|
||||||
readers = System.readers()
|
readers: list[PCSCReader] = System.readers()
|
||||||
if device > len(readers):
|
if device_index > len(readers):
|
||||||
raise PcscError(f"Device with index {device} not found.")
|
raise PcscError(f"Device with index {device_index} not found.")
|
||||||
|
|
||||||
self.pcsc_device = readers[device]
|
self.pcsc_device = readers[device_index]
|
||||||
self.card_connection = ExclusiveConnectCardConnection(
|
self.card_connection = ExclusiveConnectCardConnection(
|
||||||
self.pcsc_device.createConnection()
|
self.pcsc_device.createConnection()
|
||||||
)
|
)
|
||||||
@@ -33,40 +35,38 @@ class PcscLink(LinkBaseTpdu):
|
|||||||
return "PCSC[%s]" % (self._reader)
|
return "PCSC[%s]" % (self._reader)
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
try:
|
self.disconnect()
|
||||||
self.card_connection.disconnect()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
self.connect()
|
self.connect()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, traceback):
|
def __exit__(self, exc_type, exc_value, traceback):
|
||||||
self.card_connection.disconnect()
|
self.disconnect()
|
||||||
log.debug("Disconnected from device %s", self.pcsc_device)
|
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
try:
|
try:
|
||||||
self.card_connection.disconnect()
|
self.card_connection.disconnect()
|
||||||
self.card_connection.connect()
|
self.card_connection.connect()
|
||||||
supported_protocols = self.card_connection.getSupportedProtocols()
|
supported_protocol = self.card_connection.getProtocol()
|
||||||
self.card_connection.disconnect()
|
self.card_connection.disconnect()
|
||||||
|
|
||||||
if supported_protocols & CardConnection.T0_protocol:
|
if supported_protocol & CardConnection.T0_protocol:
|
||||||
protocol = CardConnection.T0_protocol
|
protocol = CardConnection.T0_protocol
|
||||||
elif supported_protocols & CardConnection.T1_protocol:
|
elif supported_protocol & CardConnection.T1_protocol:
|
||||||
protocol = CardConnection.T1_protocol
|
protocol = CardConnection.T1_protocol
|
||||||
else:
|
else:
|
||||||
raise PcscError("No supported protocol found.")
|
raise PcscError("No supported protocol found: %s" % supported_protocol)
|
||||||
|
|
||||||
|
self.set_tpdu_format(protocol)
|
||||||
log.debug(
|
log.debug(
|
||||||
"Connecting to device %s using protocol %s", self.pcsc_device, protocol
|
"Connecting to device %s using protocol T%s", self.pcsc_device, protocol
|
||||||
)
|
)
|
||||||
|
|
||||||
self.card_connection.connect(protocol=protocol)
|
self.card_connection.connect(protocol=protocol)
|
||||||
except (CardConnectionException, NoCardException) as e:
|
except (CardConnectionException, NoCardException) as e:
|
||||||
raise PcscError from e
|
log.error("Failed to connect to device")
|
||||||
|
raise PcscError("Failed to connect to device") from e
|
||||||
|
|
||||||
log.debug("Connected to device %s", self.pcsc_device)
|
log.debug("Connected to device %s", self.pcsc_device)
|
||||||
|
|
||||||
@@ -75,24 +75,29 @@ class PcscLink(LinkBaseTpdu):
|
|||||||
log.debug("Disconnected from device %s", self.pcsc_device)
|
log.debug("Disconnected from device %s", self.pcsc_device)
|
||||||
|
|
||||||
def _reset_card(self):
|
def _reset_card(self):
|
||||||
|
log.debug("Resetting card...")
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
self.connect()
|
self.connect()
|
||||||
|
return 1
|
||||||
|
|
||||||
def wait_for_card(self, timeout: int | None = None, newcardonly: bool = False):
|
def wait_for_card(self, timeout: int | None = None, newcardonly: bool = False):
|
||||||
card_request = CardRequest(
|
card_request = CardRequest(
|
||||||
readers=[self.pcsc_device], timeout=timeout, newcardonly=newcardonly
|
readers=[self.pcsc_device], timeout=timeout, newcardonly=newcardonly
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
log.debug("Waiting for card on device %s", self.pcsc_device)
|
log.debug("Waiting for card...")
|
||||||
card_request.waitforcard()
|
card_request.waitforcard()
|
||||||
except CardRequestTimeoutException as e:
|
except CardRequestTimeoutException as e:
|
||||||
raise PcscError from e
|
raise PcscError("Timeout waiting for card") from e
|
||||||
|
|
||||||
self.__connect()
|
self.connect()
|
||||||
|
|
||||||
|
def get_atr(self) -> Hexstr:
|
||||||
|
return self.card_connection.getATR()
|
||||||
|
|
||||||
def send_tpdu(self, tpdu: Hexstr) -> ResTuple:
|
def send_tpdu(self, tpdu: Hexstr) -> ResTuple:
|
||||||
try:
|
try:
|
||||||
data, sw1, sw2 = self.card_connection.transmit(h2i(tpdu))
|
data, sw1, sw2 = self.card_connection.transmit(h2i(tpdu))
|
||||||
return i2h(data) + i2h([sw1, sw2])
|
return i2h(data), i2h([sw1, sw2])
|
||||||
except CardConnectionException as e:
|
except CardConnectionException as e:
|
||||||
raise PcscError from e
|
raise PcscError("Failed to send TPDU") from e
|
||||||
|
|||||||
@@ -16,8 +16,9 @@ from pySim.runtime import RuntimeState
|
|||||||
from pySim.ts_31_102 import CardApplicationUSIM
|
from pySim.ts_31_102 import CardApplicationUSIM
|
||||||
from pySim.ts_31_103 import CardApplicationISIM
|
from pySim.ts_31_103 import CardApplicationISIM
|
||||||
from pySim.ts_102_221 import CardProfileUICC
|
from pySim.ts_102_221 import CardProfileUICC
|
||||||
from util.dummy_sim_link import DummySimLink
|
|
||||||
from util.logger import log
|
from resimulate.util.dummy_sim_link import DummySimLink
|
||||||
|
from resimulate.util.logger import log
|
||||||
|
|
||||||
APDU_COMMANDS = (
|
APDU_COMMANDS = (
|
||||||
UiccApduCommands + UsimApduCommands + ManageApduCommands + GlobalPlatformCommands
|
UiccApduCommands + UsimApduCommands + ManageApduCommands + GlobalPlatformCommands
|
||||||
@@ -42,8 +43,8 @@ class Tracer:
|
|||||||
self.runtime_state = RuntimeState(card, profile)
|
self.runtime_state = RuntimeState(card, profile)
|
||||||
self.apdu_decoder = ApduDecoder(APDU_COMMANDS)
|
self.apdu_decoder = ApduDecoder(APDU_COMMANDS)
|
||||||
|
|
||||||
self.suppress_status = True
|
self.suppress_status = False
|
||||||
self.suppress_select = True
|
self.suppress_select = False
|
||||||
self.show_raw_apdu = False
|
self.show_raw_apdu = False
|
||||||
self.source = source
|
self.source = source
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user