On the Wire

Evan Booth Evan Booth

Objective

DifficultyDescription
4/5Help Evan next to city hall hack this gnome and retrieve the temperature value reported by the I²C device at address 0x3C. The temperature data is XOR-encrypted, so you’ll need to work through each communication stage to uncover the necessary keys. Start with the unencrypted data being transmitted over the 1-wire protocol.

Evan Booth mission statement

Hey, I’m Evan!

I like to build things.

All sorts of things.

If you aren’t failing on some front, consider adjusting your difficulty settings.

Come find me later - I might need a hand with something.


So here’s the deal - there are some seriously bizarre signals floating around this area.

Not your typical radio chatter or WiFi noise, but something… different.

I’ve been trying to make sense of the patterns, but it’s like trying to build a robot hand out of a coffee maker - you need the right approach.

Think you can help me decode whatever weirdness is being transmitted out there?

You know what happens to electronics in extreme cold? They fail. All my builds, all my robots, all my weird coffee-maker contraptions—frozen solid. We can’t let Frosty turn this place into a permanent deep freeze.

Solution

This objective focuses on low-level hardware communication and protocol analysis rather than traditional network traffic. By working through the 1-Wire communication stages and reversing the XOR-based obfuscation, I had to reconstruct how sensor data is exchanged and protected on constrained devices. The challenge highlights how security weaknesses can emerge in embedded systems when lightweight encryption is applied without strong key management, and how understanding protocol mechanics is essential when investigating IoT-style environments.

The endpoints where extracted by viesing for WS in Network tab:

ProtocolEndpoint nameEndpoint URL
1-Wiredqwss://signals.holidayhackchallenge.com/wire/dq
SPImosiwss://signals.holidayhackchallenge.com/wire/mosi
SPIsckwss://signals.holidayhackchallenge.com/wire/sck
I2Csdawss://signals.holidayhackchallenge.com/wire/sda
I2Csclwss://signals.holidayhackchallenge.com/wire/scl

1-Wire

#!/usr/bin/env python3
"""
SANS Holiday Hack Challenge 2025 – "On the Wire" (1-Wire / dq)

What this script does
---------------------
1) Connects to the dq WebSocket endpoint.
2) Records one complete transmission (from marker "reset" to marker "stop").
3) Saves the raw capture to dq_capture.json for reproducibility in a write-up.
4) Decodes 1-Wire data by measuring LOW pulse widths (timing-based decoding).
5) Prints decoded bytes as HEX + ASCII and extracts the XOR key if present.

Why timing matters
------------------
1-Wire does NOT send bits as a simple sequence of 0/1 voltage levels.
Instead, each bit is encoded by how long the line stays LOW during a time slot.

Important implementation detail for this challenge feed:
- The 'presence' marker is on the FALLING edge of the presence pulse (v=0).
  If you start decoding immediately at that record, you will accidentally treat
  the presence pulse as a data bit and the entire decode will be shifted.
  Therefore, we start decoding AFTER the presence pulse ends (next v==1).

Expected outcome
----------------
The decoded ASCII should include an instruction ending with:
"... XOR key: icy"
"""

import json
import websocket
from collections import Counter

# -----------------------------
# Capture control
# -----------------------------

START_MARKER = "reset"
STOP_MARKER = "stop"

recording = False
recording_done = False
recorded = []  # list of dict frames


def on_message(ws, message: str):
    """Capture frames between START_MARKER and STOP_MARKER."""
    global recording, recording_done, recorded

    if recording_done:
        return

    msg = json.loads(message)
    marker = msg.get("marker")

    # Start recording at reset
    if not recording and marker == START_MARKER:
        recording = True
        recorded = [msg]  # include reset frame
        print("START recording (reset)")
        return

    # Ignore everything until we see the start marker
    if not recording:
        return

    # Recording phase
    recorded.append(msg)

    # Stop recording at stop marker
    if marker == STOP_MARKER:
        print(f"STOP recording (stop). Events captured: {len(recorded)}")
        recording = False
        recording_done = True
        ws.keep_running = False
        ws.close()


def on_error(ws, error):
    print("WebSocket error:", error)


def on_close(ws, close_status_code, close_msg):
    print("Connection Closed", close_status_code, close_msg)


# -----------------------------
# 1-Wire decoding helpers
# -----------------------------

def collapse_levels(frames):
    """
    Collapse consecutive identical levels to obtain a transition list.

    Input : list of frames dicts with keys t, v
    Output: list of (t, v) tuples where v flips each entry
    """
    frames = sorted(frames, key=lambda x: x["t"])
    out = []
    last_v = None
    for f in frames:
        t = f["t"]
        v = int(f["v"])
        if last_v is None or v != last_v:
            out.append((t, v))
            last_v = v
    return out


def extract_low_pulse_widths(transitions):
    """
    Extract LOW pulse widths.

    A LOW pulse is a segment where v==0 followed by v==1.
    Width = t_rise - t_fall
    """
    widths = []
    for i in range(len(transitions) - 1):
        t0, v0 = transitions[i]
        t1, v1 = transitions[i + 1]
        if v0 == 0 and v1 == 1:
            widths.append(t1 - t0)
    return widths


def infer_threshold(widths):
    """
    Infer a threshold separating short and long LOW pulses.

    This challenge feed typically has two dominant widths (e.g. 6 and 60).
    We take the two most common widths and set the threshold midway.
    """
    counts = Counter(widths)
    common = [w for (w, _n) in counts.most_common(5)]
    if len(common) < 2:
        raise ValueError("Not enough distinct pulse widths to infer threshold.")
    w_short = min(common[0], common[1])
    w_long = max(common[0], common[1])
    threshold = (w_short + w_long) / 2.0
    return threshold, w_short, w_long, counts


def bits_to_bytes_lsb(bits):
    """
    1-Wire transmits bits LSB-first within each byte.

    bit0 is sent first, then bit1, ..., bit7.
    """
    out = []
    cur = 0
    pos = 0
    for b in bits:
        cur |= (b & 1) << pos
        pos += 1
        if pos == 8:
            out.append(cur)
            cur = 0
            pos = 0
    return bytes(out)


def hexdump_and_ascii(data: bytes):
    """Return a (hex_string, ascii_string) pair."""
    hx = " ".join(f"{b:02x}" for b in data)
    asc = "".join(chr(b) if 0x20 <= b <= 0x7E else "." for b in data)
    return hx, asc


def find_decode_start_index(frames):
    """
    Start decoding AFTER the presence pulse ends.

    The feed marks:
      presence marker at the FALLING edge (v=0)
    So we must move to the next frame where v==1 after that timestamp,
    otherwise the presence pulse width (e.g. 150) will be misread as a data bit.

    If no presence marker exists, fall back to "reset" and start after it returns high.
    """
    frames = sorted(frames, key=lambda x: x["t"])

    def start_after_marker(marker_name: str):
        for i, f in enumerate(frames):
            if f.get("marker") == marker_name:
                t_mark = f["t"]
                # Find next rising level (v==1) AFTER this marker timestamp
                for j in range(i + 1, len(frames)):
                    if frames[j]["t"] > t_mark and int(frames[j]["v"]) == 1:
                        return j
                return i
        return None

    idx = start_after_marker("presence")
    if idx is not None:
        return idx

    idx = start_after_marker("reset")
    if idx is not None:
        return idx

    return 0


def decode_1wire(frames):
    """
    Full pipeline:
    - Determine correct start index (after presence pulse ends)
    - Collapse levels to transitions
    - Extract LOW widths
    - Infer threshold separating short vs long widths
    - Map short->1, long->0 (standard 1-wire convention)
    - Pack bits LSB-first into bytes
    """
    start_idx = find_decode_start_index(frames)
    subset = frames[start_idx:]

    transitions = collapse_levels(subset)
    widths = extract_low_pulse_widths(transitions)

    threshold, w_short, w_long, counts = infer_threshold(widths)

    # Standard mapping for this challenge stream:
    #   short low pulse => bit 1
    #   long  low pulse => bit 0
    bits = [1 if w < threshold else 0 for w in widths]

    decoded = bits_to_bytes_lsb(bits)

    return decoded, {
        "start_idx": start_idx,
        "counts": counts,
        "threshold": threshold,
        "w_short": w_short,
        "w_long": w_long,
        "num_widths": len(widths),
        "num_bits": len(bits),
        "num_bytes": len(decoded),
    }


def extract_printable(data: bytes) -> str:
    """Extract printable ASCII characters for quick human inspection."""
    return "".join(chr(b) for b in data if 0x20 <= b <= 0x7E)


# -----------------------------
# Main
# -----------------------------

def main():
    endpoint = "wss://signals.holidayhackchallenge.com/wire/dq"
    ws = websocket.WebSocketApp(
        endpoint,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )

    ws.run_forever()

    # Save raw capture for your write-up
    with open("dq_capture.json", "w", encoding="utf-8") as out:
        json.dump(recorded, out, indent=2)

    decoded, meta = decode_1wire(recorded)

    print("\n--- LOW pulse width histogram (top 10) ---")
    for w, n in meta["counts"].most_common(10):
        print(f"width={w:>6}  count={n}")

    print("\n--- Decode parameters ---")
    print(f"decode start index: {meta['start_idx']}")
    print(f"short width: {meta['w_short']}")
    print(f"long  width: {meta['w_long']}")
    print(f"threshold : {meta['threshold']:.2f} (short<thr => bit=1)")
    print(f"bits      : {meta['num_bits']}")
    print(f"bytes     : {meta['num_bytes']}")

    hx, asc = hexdump_and_ascii(decoded)
    print("\n--- Decoded payload ---")
    print("HEX  :", hx)
    print("ASCII:", asc)

    printable = extract_printable(decoded)
    print("\n--- Printable only ---")
    print(printable)

    # Optional: try to pull a key after "key:"
    if "key:" in printable.lower():
        # Split on last occurrence of ':' to be tolerant of other colons earlier
        candidate = printable.split(":")[-1].strip()
        print("\n--- Candidate XOR key ---")
        print(candidate)


if __name__ == "__main__":
    main()

This script outputted:

1-Wire XOR 1-Wire XOR

The XOR key is:

icy

SPI

#!/usr/bin/env python3
"""
SANS Holiday Hack Challenge 2025 – "On the Wire" (SPI stage, LIVE)
==================================================================

Goal
----
Capture the LIVE SPI signals from the WebSocket "wires" and decode the byte stream.
Then XOR-decrypt it using the key recovered from the 1-Wire stage (key: "icy").

Why brute force is necessary here
---------------------------------
SPI decoding depends on:
  - Which clock edge you sample on (rising vs falling) -> CPHA/Mode issues
  - Bit order within each byte (MSB-first vs LSB-first)
  - Byte alignment (you might start sampling 1–7 bits too early/late)

If any of these are wrong, XOR output may look like printable garbage
("text-like", but no real words). The fastest way to recover is to brute force:
  - edge ∈ {rising, falling}
  - bit_order ∈ {msb, lsb}
  - bit_shift ∈ {0..7}  (re-align byte boundaries)

This script:
  1) Connects to /wire/sck and /wire/mosi
  2) Captures one broadcast loop (detected by MOSI timestamp wrap-around t dropping)
  3) Decodes using the brute-force matrix above
  4) Ranks candidates by a simple plaintext-likeness score
  5) Prints top candidates and full hexdump for the best candidate

Reproducibility
---------------
- The script saves the raw samples to spi_sck_live.json and spi_mosi_live.json.
"""

from __future__ import annotations

import json
import threading
import time
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple

import websocket


# -----------------------------
# Endpoints
# -----------------------------
SCK_ENDPOINT  = "wss://signals.holidayhackchallenge.com/wire/sck"
MOSI_ENDPOINT = "wss://signals.holidayhackchallenge.com/wire/mosi"

# XOR key from 1-Wire stage
XOR_KEY = b"icy"

# -----------------------------
# Capture stop conditions
# -----------------------------
MAX_CAPTURE_SECONDS = 20.0   # safety cap
POST_WRAP_GRACE_MS = 150     # let a little extra data arrive after wrap detected


# -----------------------------
# Data structures
# -----------------------------
@dataclass(frozen=True)
class Event:
    t: int
    v: int
    marker: Optional[str] = None


lock = threading.Lock()

wire_frames = defaultdict(list)  # wire_frames["sck"] = [Event...], ["mosi"] = [Event...]

done = False
wrap_detected = False
wrap_wallclock_time: Optional[float] = None

mosi_last_t: Optional[int] = None
mosi_saw_advance = False

last_any_message_time = time.time()


# -----------------------------
# Waveform helpers
# -----------------------------
def collapse_levels(events: List[Event]) -> List[Tuple[int, int]]:
    """
    Collapse consecutive identical v values to obtain transitions (t, v).
    """
    events = sorted(events, key=lambda e: e.t)
    out: List[Tuple[int, int]] = []
    last_v: Optional[int] = None
    for e in events:
        if last_v is None or e.v != last_v:
            out.append((e.t, e.v))
            last_v = e.v
    return out


def extract_edges(clock_transitions: List[Tuple[int, int]], edge: str) -> List[int]:
    """
    Extract timestamps of clock edges (rising or falling) from collapsed transitions.
    """
    times: List[int] = []
    for i in range(1, len(clock_transitions)):
        t_prev, v_prev = clock_transitions[i - 1]
        t_now, v_now   = clock_transitions[i]
        if edge == "rising" and v_prev == 0 and v_now == 1:
            times.append(t_now)
        elif edge == "falling" and v_prev == 1 and v_now == 0:
            times.append(t_now)
    return times


def sample_level_at(transitions: List[Tuple[int, int]], t_edge: int) -> int:
    """
    Return the line level at time t_edge given transitions [(t, v), ...].
    Linear scan is fine for the challenge capture sizes.
    """
    if not transitions:
        return 0
    cur = transitions[0][1]
    for t, v in transitions:
        if t > t_edge:
            break
        cur = v
    return cur


def bits_to_bytes(bits: List[int], bit_order: str) -> bytes:
    """
    Pack bits into bytes.
      - msb: b = (b<<1) | bit
      - lsb: b |= bit << position
    """
    out = bytearray()
    n = len(bits) - (len(bits) % 8)

    if bit_order == "msb":
        for i in range(0, n, 8):
            b = 0
            for bit in bits[i:i+8]:
                b = (b << 1) | (bit & 1)
            out.append(b)

    elif bit_order == "lsb":
        for i in range(0, n, 8):
            b = 0
            for pos, bit in enumerate(bits[i:i+8]):
                b |= (bit & 1) << pos
            out.append(b)
    else:
        raise ValueError("bit_order must be 'msb' or 'lsb'")

    return bytes(out)


def xor_repeat(data: bytes, key: bytes) -> bytes:
    """
    XOR with a repeating key (same operation for encrypt/decrypt).
    """
    return bytes(b ^ key[i % len(key)] for i, b in enumerate(data))


def hexdump_ascii(data: bytes, width: int = 16) -> str:
    """
    Hex + ASCII dump for write-ups.
    """
    def p(x: int) -> str:
        return chr(x) if 0x20 <= x <= 0x7E else "."
    lines = []
    for i in range(0, len(data), width):
        chunk = data[i:i+width]
        hexpart = " ".join(f"{b:02x}" for b in chunk)
        asciipart = "".join(p(b) for b in chunk)
        lines.append(f"{i:04x}  {hexpart:<{width*3}}  {asciipart}")
    return "\n".join(lines)


# -----------------------------
# Brute-force SPI decode scoring
# -----------------------------
def shift_bits(bits: List[int], shift: int) -> List[int]:
    """
    Drop the first 'shift' bits to realign byte boundaries.
    shift is 0..7.
    """
    if shift <= 0:
        return bits
    if shift >= len(bits):
        return []
    return bits[shift:]


def score_plaintext(data: bytes) -> float:
    """
    Heuristic scoring:
    - printable ratio
    - presence of common tokens that appear in HHC hint payloads
    """
    if not data:
        return -1.0

    printable = sum(
        1 for x in data
        if 0x20 <= x <= 0x7E or x in (0x0A, 0x0D, 0x09)
    )
    ratio = printable / len(data)

    s = "".join(chr(x) if 0x20 <= x <= 0x7E else " " for x in data).lower()

    tokens = [
        "key", "xor", "addr", "address", "i2c", "target", "device",
        "use", "decrypt", "payload", "{", "}", ":", "=", "[", "]"
    ]
    hits = sum(1 for t in tokens if t in s)

    # printable ratio dominates; token hits add small bonus
    return ratio + (0.15 * hits)


def try_all_spi_decodes(sck_events: List[Event], mosi_events: List[Event], xor_key: bytes):
    """
    Try combinations:
      - edge: rising/falling
      - bit_order: msb/lsb
      - bit_shift: 0..7

    Returns list of tuples:
      (score, edge, bit_order, shift, raw_bytes, decrypted_bytes)
    sorted by descending score.
    """
    sck_trans = collapse_levels(sck_events)
    mosi_trans = collapse_levels(mosi_events)

    candidates = []
    for edge in ("rising", "falling"):
        edge_times = extract_edges(sck_trans, edge)
        bits = [sample_level_at(mosi_trans, t) for t in edge_times]

        for bit_order in ("msb", "lsb"):
            for shift in range(0, 8):
                bits2 = shift_bits(bits, shift)
                raw = bits_to_bytes(bits2, bit_order)
                dec = xor_repeat(raw, xor_key)
                sc = score_plaintext(dec)
                candidates.append((sc, edge, bit_order, shift, raw, dec))

    candidates.sort(key=lambda x: x[0], reverse=True)
    return candidates


# -----------------------------
# WebSocket callbacks
# -----------------------------
def on_message_sck(ws, message: str):
    global last_any_message_time
    obj = json.loads(message)
    if not isinstance(obj, dict):
        return
    if obj.get("line") != "sck":
        return
    if "t" not in obj or "v" not in obj:
        return

    e = Event(t=int(obj["t"]), v=int(obj["v"]), marker=obj.get("marker"))
    with lock:
        last_any_message_time = time.time()
        wire_frames["sck"].append(e)


def on_message_mosi(ws, message: str):
    global done, wrap_detected, wrap_wallclock_time
    global mosi_last_t, mosi_saw_advance, last_any_message_time

    obj = json.loads(message)
    if not isinstance(obj, dict):
        return
    if obj.get("line") != "mosi":
        return
    if "t" not in obj or "v" not in obj:
        return

    t = int(obj["t"])
    v = int(obj["v"])
    e = Event(t=t, v=v, marker=obj.get("marker"))

    with lock:
        last_any_message_time = time.time()
        wire_frames["mosi"].append(e)

        if mosi_last_t is None:
            mosi_last_t = t
            return

        if t > mosi_last_t:
            mosi_saw_advance = True

        # loop boundary: time drops after we have seen time advance
        if mosi_saw_advance and t < mosi_last_t and not wrap_detected:
            wrap_detected = True
            wrap_wallclock_time = time.time()

        mosi_last_t = t


def on_error(ws, error):
    print("WebSocket error:", error)


def on_close(ws, close_status_code, close_msg):
    # Normal when we stop capture
    pass


# -----------------------------
# Supervisor
# -----------------------------
def supervisor(ws_sck: websocket.WebSocketApp, ws_mosi: websocket.WebSocketApp):
    global done

    started = time.time()
    while True:
        time.sleep(0.05)

        with lock:
            now = time.time()

            # hard cap
            if now - started > MAX_CAPTURE_SECONDS:
                done = True

            # if loop wrap detected, wait a brief grace window then stop
            if wrap_detected and wrap_wallclock_time is not None:
                if (now - wrap_wallclock_time) * 1000.0 >= POST_WRAP_GRACE_MS:
                    done = True

            if done:
                break

    try:
        ws_sck.close()
    except Exception:
        pass
    try:
        ws_mosi.close()
    except Exception:
        pass


def slice_one_loop_by_t_reset(events: List[Event]) -> List[Event]:
    """
    Slice exactly one loop based on the first detected wrap (t decreases).
    Uses the event ordering by t. Works well for the HHC looped streams.
    """
    if not events:
        return events

    ev = sorted(events, key=lambda e: e.t)
    # find first t==0 (common) or a wrap point; prefer t==0 if present
    first0 = next((i for i, e in enumerate(ev) if e.t == 0), None)
    if first0 is None:
        # fallback: find first drop
        for i in range(1, len(ev)):
            if ev[i].t < ev[i-1].t:
                return ev[:i]
        return ev

    # find next t==0 after time advanced
    advanced = False
    for j in range(first0 + 1, len(ev)):
        if ev[j].t != 0:
            advanced = True
            continue
        if advanced and ev[j].t == 0:
            return ev[first0:j]

    return ev[first0:]


# -----------------------------
# Main
# -----------------------------
def main():
    global done

    ws_sck = websocket.WebSocketApp(
        "wss://signals.holidayhackchallenge.com/wire/sck",
        on_message=on_message_sck,
        on_error=on_error,
        on_close=on_close,
    )
    ws_mosi = websocket.WebSocketApp(
        "wss://signals.holidayhackchallenge.com/wire/mosi",
        on_message=on_message_mosi,
        on_error=on_error,
        on_close=on_close,
    )

    t_sck = threading.Thread(target=ws_sck.run_forever, daemon=True)
    t_mosi = threading.Thread(target=ws_mosi.run_forever, daemon=True)
    t_sck.start()
    t_mosi.start()

    sup = threading.Thread(target=supervisor, args=(ws_sck, ws_mosi), daemon=True)
    sup.start()
    sup.join()

    with lock:
        sck_all = list(wire_frames["sck"])
        mosi_all = list(wire_frames["mosi"])

    # Save raw capture for write-up reproducibility
    Path("spi_sck_live.json").write_text(
        "\n".join(json.dumps({"t": e.t, "v": e.v, "marker": e.marker}) for e in sck_all),
        encoding="utf-8",
    )
    Path("spi_mosi_live.json").write_text(
        "\n".join(json.dumps({"t": e.t, "v": e.v, "marker": e.marker}) for e in mosi_all),
        encoding="utf-8",
    )

    if not sck_all or not mosi_all:
        raise SystemExit("Capture failed: missing SCK or MOSI data.")

    # Slice to one loop using MOSI's t-reset behavior
    mosi_loop = slice_one_loop_by_t_reset(mosi_all)

    # Align SCK window roughly to MOSI loop time range
    t_min = min(e.t for e in mosi_loop)
    t_max = max(e.t for e in mosi_loop)
    sck_loop = [e for e in sck_all if t_min <= e.t <= t_max]

    print("\n--- Capture summary ---")
    print(f"MOSI events (all) : {len(mosi_all)}")
    print(f"SCK  events (all) : {len(sck_all)}")
    print(f"MOSI events (loop): {len(mosi_loop)}  t=[{t_min}..{t_max}]")
    print(f"SCK  events (win) : {len(sck_loop)}  t=[{t_min}..{t_max}]")

    # Brute-force decode matrix
    cands = try_all_spi_decodes(sck_loop, mosi_loop, xor_key=XOR_KEY)

    print("\n--- Top SPI decode candidates ---")
    for sc, edge, order, shift, _raw, dec in cands[:8]:
        printable = "".join(chr(b) for b in dec if 0x20 <= b <= 0x7E)
        print(f"\nscore={sc:.3f} edge={edge:7} order={order:3} shift={shift}  bytes={len(dec)}")
        print(printable[:240])

    # Print full dump for the best candidate
    best = cands[0]
    sc, edge, order, shift, raw, dec = best

    print("\n=== BEST CANDIDATE (full dump) ===")
    print(f"score={sc:.3f} edge={edge} bit_order={order} bit_shift={shift}")
    print("\n--- RAW ---")
    print(hexdump_ascii(raw))
    print("\n--- DECRYPTED (XOR key='icy') ---")
    print(hexdump_ascii(dec))

    printable_full = "".join(chr(b) for b in dec if 0x20 <= b <= 0x7E)
    print("\n--- Decrypted printable-only (full) ---")
    print(printable_full)


if __name__ == "__main__":
    main()

This script outputted:

SPI output SPI output

Decryptred text is:

read and decrypt the I2C bus data using the XOR key: bananza. the temperature sensor address is 0x3

I2C

#!/usr/bin/env python3
"""
SANS HHC 2025 – On the Wire – I2C (LIVE, final)

Known facts:
- Target I²C device (7-bit) = 0x3C
- XOR key = "bananza"
- Expected answer format = xx.xx

Observed bus behavior in your run:
- No READ transactions in this segment (R=0, W=2), so decode WRITE payloads.
- Correct framing is "Mode A" (no ACK bits present in collected address-bit/data-bit stream).
- Decrypted payload for 0x3C contains ASCII temperature (e.g., "32.84").

What this script does:
1) Capture one broadcast loop (timestamp wrap) from SDA/SCL
2) Parse I²C transactions using start/stop markers
3) For last transaction to 0x3C:
   - Convert bits->bytes using Mode A (pack every 8 bits)
   - XOR-decrypt with "bananza"
   - If decrypted text matches r"^\\d{2}\\.\\d{2}$", print SUBMIT: xx.xx
   - Otherwise print diagnostics and (optional) fallback interpretations
"""

import json
import re
import threading
import time
from dataclasses import dataclass
from typing import List, Optional

import websocket


SDA_ENDPOINT = "wss://signals.holidayhackchallenge.com/wire/sda"
SCL_ENDPOINT = "wss://signals.holidayhackchallenge.com/wire/scl"

TARGET_ADDRESS = 0x3C
XOR_KEY = b"bananza"

MAX_CAPTURE_SECONDS = 25.0
POST_WRAP_GRACE_MS = 200

RE_ANSWER = re.compile(r"^\d{2}\.\d{2}$")


@dataclass
class BitEvent:
    t: int
    v: int
    marker: Optional[str]


@dataclass
class TxBits:
    address: int
    rw: int
    bits_after_addr: List[int]


lock = threading.Lock()
events: List[BitEvent] = []

wrap_detected = False
wrap_wallclock_time: Optional[float] = None
last_t: Optional[int] = None
saw_advance = False


def xor_decrypt(data: bytes, key: bytes) -> bytes:
    return bytes(b ^ key[i % len(key)] for i, b in enumerate(data))


def assemble_byte_msb(bits8: List[int]) -> int:
    b = 0
    for bit in bits8:
        b = (b << 1) | (bit & 1)
    return b


def bits_to_bytes_mode_a(bits: List[int]) -> bytes:
    """
    Mode A: ACK bits are NOT present in the collected bitstream.
    Pack sequential 8-bit groups.
    """
    out = bytearray()
    n = len(bits) - (len(bits) % 8)
    for i in range(0, n, 8):
        out.append(assemble_byte_msb(bits[i:i+8]))
    return bytes(out)


def parse_i2c_transactions(events_in: List[BitEvent]) -> List[TxBits]:
    """
    START ... STOP framing.
    Collect only address-bit and data-bit values (0/1).
    The first 8 bits after START are the address byte (MSB-first).
    """
    txs: List[TxBits] = []
    in_tx = False
    bits: List[int] = []

    for e in events_in:
        if e.marker == "start":
            in_tx = True
            bits = []
            continue

        if e.marker == "stop":
            if in_tx and len(bits) >= 8:
                addr_byte = assemble_byte_msb(bits[:8])
                address = addr_byte >> 1
                rw = addr_byte & 1
                txs.append(TxBits(address=address, rw=rw, bits_after_addr=bits[8:]))
            in_tx = False
            bits = []
            continue

        if in_tx and e.marker in ("address-bit", "data-bit"):
            bits.append(e.v)

    return txs


def on_message_sda(ws, message: str):
    global wrap_detected, wrap_wallclock_time, last_t, saw_advance
    obj = json.loads(message)
    if not isinstance(obj, dict) or obj.get("line") != "sda":
        return

    t = int(obj.get("t", 0))
    v = int(obj.get("v", 0))
    marker = obj.get("marker")

    with lock:
        events.append(BitEvent(t=t, v=v, marker=marker))

        if last_t is not None:
            if t > last_t:
                saw_advance = True
            elif saw_advance and t < last_t and not wrap_detected:
                wrap_detected = True
                wrap_wallclock_time = time.time()
        last_t = t


def on_message_scl(ws, message: str):
    # Not used; kept to mirror physical bus capture.
    pass


def supervisor(ws_sda, ws_scl):
    start = time.time()
    while True:
        time.sleep(0.05)
        now = time.time()

        if now - start > MAX_CAPTURE_SECONDS:
            break
        if wrap_detected and wrap_wallclock_time is not None:
            if (now - wrap_wallclock_time) * 1000 >= POST_WRAP_GRACE_MS:
                break

    ws_sda.close()
    ws_scl.close()


def main():
    ws_sda = websocket.WebSocketApp(SDA_ENDPOINT, on_message=on_message_sda)
    ws_scl = websocket.WebSocketApp(SCL_ENDPOINT, on_message=on_message_scl)

    t1 = threading.Thread(target=ws_sda.run_forever, daemon=True)
    t2 = threading.Thread(target=ws_scl.run_forever, daemon=True)
    t1.start()
    t2.start()

    supervisor(ws_sda, ws_scl)

    with lock:
        captured = list(events)

    txs = parse_i2c_transactions(captured)
    target = [tx for tx in txs if tx.address == TARGET_ADDRESS]

    if not target:
        print(f"No transactions seen for 0x{TARGET_ADDRESS:02x}")
        return

    chosen = target[-1]  # value write is typically last
    enc = bits_to_bytes_mode_a(chosen.bits_after_addr)
    dec = xor_decrypt(enc, XOR_KEY)

    # Try ASCII first (this challenge returns ASCII temperature)
    try:
        text = dec.decode("ascii", errors="strict")
    except UnicodeDecodeError:
        text = ""

    text = text.strip()

    print(f"Target 0x{TARGET_ADDRESS:02x} transactions: {len(target)} (using last)")
    print(f"Chosen TX rw={chosen.rw} bits_after_addr={len(chosen.bits_after_addr)}")
    print(f"Decrypted ASCII: {repr(text)}")
    print(f"Decrypted HEX  : {dec.hex()}")

    if RE_ANSWER.fullmatch(text):
        print(f"\nSUBMIT: {text}")
        return

    # If not a direct xx.xx string, provide guidance for manual inspection.
    print("\nDecrypted output did not match xx.xx. Inspect the ASCII/HEX above.")
    print("If the value is embedded, search the ASCII for a pattern like \\d\\d\\.\\d\\d.")


if __name__ == "__main__":
    main()

This script outputted:

I2C output I2C output

The value I was looking for is:

32.84

Embedded protocols often trade simplicity for security, making protocol literacy a critical defensive skill.

Evan Booth closing words

After solving, Evan says:

Nice work! You cracked that signal encoding like a pro.

Turns out the weirdness had a method to it after all - just like most of my builds!

Hints

On Rails

  1. Connect to the captured wire files or endpoints for the relevant wires.
  2. Collect all frames for the transmission (buffer until inactivity or loop boundary).
  3. Identify protocol from wire names (e.g., dq → 1-Wire; mosi/sck → SPI; sda/scl → I²C).
  4. Decode the raw signal:
  • Pulse-width protocols: locate falling→rising transitions and measure low-pulse width.
  • Clocked protocols: detect clock edges and sample the data line at the specified sampling phase.
  1. Assemble bits into bytes taking the correct bit order (LSB vs MSB).

  2. Convert bytes to text (printable ASCII or hex as appropriate).

  3. Extract information from the decoded output — it contains the XOR key or other hints for the next stage.

  4. Repeat Stage 1 decoding to recover raw bytes (they will appear random).

  5. Apply XOR decryption using the key obtained from the previous stage.

  6. Inspect decrypted output for next-stage keys or target device information.

  • Multiple 7-bit device addresses share the same SDA/SCL lines.
  • START condition: SDA falls while SCL is high. STOP: SDA rises while SCL is high.
  • First byte of a transaction = (7-bit address « 1) | R/W. Extract address with address = first_byte » 1.
  • Identify and decode every device’s transactions; decrypt only the target device’s payload.
  • Print bytes in hex and as ASCII (if printable) — hex patterns reveal structure.
  • Check printable ASCII range (0x20–0x7E) to spot valid text.
  • Verify endianness: swapping LSB/MSB will quickly break readable text.
  • For XOR keys, test short candidate keys and look for common English words.
  • If you connect mid-broadcast, wait for the next loop or detect a reset/loop marker before decoding.
  • Buffering heuristic: treat the stream complete after a short inactivity window (e.g., 500 ms) or after a full broadcast loop.
  • Sort frames by timestamp per wire and collapse consecutive identical levels before decoding to align with the physical waveform.

Protocols

Key concept - Clock vs. Data signals:

  • Some protocols have separate clock and data lines (like SPI and I2C)
  • For clocked protocols, you need to sample the data line at specific moments defined by the clock
  • The clock signal tells you when to read the data signal

For 1-Wire (no separate clock):

  • Information is encoded in pulse widths (how long the signal stays low or high)
  • Different pulse widths represent different bit values
  • Look for patterns in the timing between transitions

For SPI and I2C:

  • Identify which line is the clock (SCL for I2C, SCK for SPI)
  • Data is typically valid/stable when the clock is in a specific state (high or low)
  • You need to detect clock edges (transitions) and sample data at those moments

Technical approach:

  • Sort frames by timestamp
  • Detect rising edges (0→1) and falling edges (1→0) on the clock line
  • Sample the data line’s value at each clock edge

Bits and Bytes

Critical detail - Bit ordering varies by protocol:

MSB-first (Most Significant Bit first):

  • SPI and I2C typically send the highest bit (bit 7) first
  • When assembling bytes: byte = (byte « 1) | bit_value
  • Start with an empty byte, shift left, add the new bit

LSB-first (Least Significant Bit first):

  • 1-Wire and UART send the lowest bit (bit 0) first
  • When assembling bytes: byte |= bit_value « bit_position
  • Build the byte from bit 0 to bit 7

I2C specific considerations

  • Every 9th bit is an ACK (acknowledgment) bit - ignore these when decoding data
  • The first byte in each transaction is the device address (7 bits) plus a R/W bit
  • You may need to filter for specific device addresses

Converting bytes to text:

String.fromCharCode(byte_value) // Converts byte to ASCII character

Garbage?

If your decoded data looks like gibberish:

  • The data may be encrypted with XOR cipher
  • XOR is a simple encryption: encrypted_byte XOR key_byte = plaintext_byte
  • The same operation both encrypts and decrypts: plaintext XOR key = encrypted, encrypted XOR key = plaintext

How XOR cipher works:

function xorDecrypt(encrypted, key) {
  let result = "";
  for (let i = 0; i < encrypted.length; i++) {
    const encryptedChar = encrypted.charCodeAt(i);
    const keyChar = key.charCodeAt(i % key.length);  // Key repeats
    result += String.fromCharCode(encryptedChar ^ keyChar);
  }
  return result;
}

Key characteristics:

  • The key is typically short and repeats for the length of the message
  • You need the correct key to decrypt (look for keys in previous stage messages)
  • If you see readable words mixed with garbage, you might have the wrong key or bit order

Testing your decryption:

  • Encrypted data will have random-looking byte values
  • Decrypted data should be readable ASCII text
  • Try different keys from messages you’ve already decoded

Structure

What you’re dealing with:

  • You have access to WebSocket endpoints that stream digital signal data
  • Each endpoint represents a physical wire in a hardware communication system
  • The data comes as JSON frames with three properties: line (wire name), t (timestamp), and v (value: 0 or 1)
  • The server continuously broadcasts signal data in a loop - you can connect at any time
  • This is a multi-stage challenge where solving one stage reveals information needed for the next

Where to start:

  • Connect to a WebSocket endpoint and observe the data format
  • The server automatically sends data every few seconds - just wait and collect
  • Look for documentation on the protocol types mentioned (1-Wire, SPI, I2C)
  • Consider that hardware protocols encode information in the timing and sequence of signal transitions, not just the values themselves
  • Consider capturing the WebSocket frames to a file so you can work offline