diff --git a/README.md b/README.md index 9311632..3ec3123 100644 --- a/README.md +++ b/README.md @@ -1 +1,205 @@ -# `team1k` \ No newline at end of file +# team1k + +Python control library and server for the TEAM1k X-ray detector. + +Communicates directly with the detector FPGA via UDP, replacing the previous `k_test` C++ program. Uses EPICS PV Access (PVA) for command/status and ZMQ for bulk data transfer. + +## Architecture + +``` +Client machine Detector machine ++--------------+ +-------------------------------+ +| | PVA (commands/status) | team1k-server | +| Client |<---------------------->| Register I/O (UDP:42000) | +| | | PVA server | +| | ZMQ (frame data) | PVA streamer thread | +| |<---------------------->| Frame capture thread | +| | | ZMQ data transfer thread | ++--------------+ | | + | Acquisition subprocess | + | UDP data recv (UDP:41000) | + | Frame assembly | + | -> locking_shmring | + +-------------------------------+ +``` + +## Installation + +```bash +pip install . +``` + +For HDF5 support (optional): + +```bash +pip install ".[hdf5]" +``` + +### Dependencies + +- `locking_shmring` — shared memory ring buffer (must be installed separately) +- `p4p` — EPICS PV Access for Python +- `pyzmq` — ZeroMQ for bulk data transfer +- `numpy`, `pyserial` + +## Running the server + +```bash +team1k-server --detector-ip 10.0.0.32 --log-level INFO +``` + +All options: + +``` +--detector-ip Detector IP address (default: 10.0.0.32) +--register-port Detector register port (default: 42000) +--data-port Detector data port (default: 41000) +--pv-prefix PVA prefix (default: TEAM1K:) +--zmq-port ZMQ data transfer port (default: 42005) +--config Parameter file to apply on startup +--bellow-port Bellow stage serial port (default: /dev/CameraBellowStage) +--log-level DEBUG, INFO, WARNING, ERROR (default: INFO) +``` + +## Client usage + +### Acquire frames + +```python +from team1k import Client, ExposureModes + +client = Client(data_host="detector-machine") + +# Configure +client.set_exposure_mode(ExposureModes.GLOBAL_SHUTTER_CDS) +client.set_integration_time(6.0) # ms + +# One-shot: start DAQ, capture 100 frames, stop DAQ +frames = client.acquire_frames(100) +print(frames.shape) # (100, 1024, 1024) +print(frames.dtype) # uint16 + +client.close() +``` + +### Manual DAQ control + +```python +client = Client(data_host="detector-machine") + +client.start_daq() + +# Grab frames from the running stream +batch1 = client.get_frames(50) +batch2 = client.get_frames(50) + +client.stop_daq() +client.close() +``` + +### Live image monitoring + +```python +client = Client() + +def on_frame(image): + print(f"Frame received: {image.shape}") + +client.monitor_image(on_frame) + +# ... later +client.stop_monitor("IMAGE") +client.close() +``` + +### Status + +```python +client = Client() + +print(client.get_status()) +print(client.is_acquiring()) +print(client.get_frame_rate()) +print(client.get_exposure_mode()) +print(client.get_integration_time()) + +client.close() +``` + +### Other commands + +```python +client.set_trigger_mode(TriggerModes.EXTERNAL) +client.set_adc_clock_freq(60.0) # MHz +client.set_adc_data_delay(0x1A7) +client.load_parameter_file("/path/to/params.txt") +client.set_test_mode(True) # FPGA test pattern +client.reset_connection() + +# Peripherals +client.insert_detector() +client.retract_detector() +client.power_on() +client.power_off() +``` + +## PV Access interface + +All PVs are prefixed with `TEAM1K:` by default. + +### Status PVs (read-only) + +| PV | Type | Description | +|----|------|-------------| +| `STATUS` | string[] | Server status | +| `ACQUIRING` | bool | DAQ running | +| `FRAME_RATE` | float | Current frame rate (Hz) | +| `FRAME_COUNT` | int | Total frames acquired | +| `EXPOSURE_MODE` | int | Current exposure mode (0-3) | +| `TRIGGER_MODE` | int | Current trigger mode | +| `INTEGRATION_TIME` | float | Integration time (ms) | +| `CAPTURE:STATUS` | string | IDLE / CAPTURING / READY / ERROR | +| `CAPTURE:PROGRESS` | int | Frames captured so far | +| `CAPTURE:TOTAL` | int | Frames requested | +| `IMAGE` | NTNDArray | Live image stream | + +### Command PVs (writable) + +| PV | Type | Description | +|----|------|-------------| +| `CMD:EXPOSURE_MODE` | int | Set exposure mode | +| `CMD:TRIGGER_MODE` | int | Set trigger mode | +| `CMD:INTEGRATION_TIME` | float | Set integration time (ms) | +| `CMD:START_DAQ` | int | 1=start, 0=stop | +| `CMD:CAPTURE` | int | Capture N frames | +| `CMD:ADC_CLOCK_FREQ` | float | Set ADC clock (MHz) | +| `CMD:ADC_DATA_DELAY` | int | Set ADC data delay | +| `CMD:PARAMETER_FILE` | string | Apply parameter file | +| `CMD:RESET` | int | 1=reset connection | +| `CMD:TEST_MODE` | int | 1=enable FPGA test data | + +## Package structure + +``` +src/team1k/ + detector/ # Direct UDP communication with detector FPGA + udp_transport.py # Low-level UDP socket + registers.py # Register read/write protocol + data_port.py # Data port + loopback registration + chip_config.py # Chip constants (1024x1024, packet layout) + commands.py # Detector commands (exposure, trigger, etc.) + adc.py # Si570 I2C clock programming + parameter_file.py # Parameter file parser + acquisition/ # High-throughput data path + receiver.py # Acquisition subprocess (UDP -> shmring) + filewriter/ # On-demand frame capture + capture.py # FrameCapture + ZMQ DataTransferServer + pva/ # EPICS PV Access + interface.py # PVA server, command/status PVs + streamer.py # shmring -> NTNDArray live stream + peripherals/ # Hardware peripherals + bellow_stage.py # Camera bellow stage (serial) + power_supply.py # Power supply control + server.py # Main server entry point + Client.py # PVA + ZMQ client library +``` diff --git a/pyproject.toml b/pyproject.toml index 0a29cf8..8cacec1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "numpy >= 1.24.0", "pyserial >= 3.5", "p4p >= 4.1.0", + "pyzmq >= 25.0.0", ] [project.optional-dependencies] diff --git a/src/team1k/Client.py b/src/team1k/Client.py index 1e8fdf4..f945da4 100644 --- a/src/team1k/Client.py +++ b/src/team1k/Client.py @@ -1,23 +1,44 @@ #!/usr/bin/env python3 """ -Team1k Client — PVA-based interface to the Team1k detector server. +Team1k Client — PVA + ZMQ interface to the Team1k detector server. -Replaces the previous ZMQ-based client with EPICS PV Access communication. -All commands are sent via pvput to command PVs, and status/data are -received via pvget or monitor subscriptions. +Commands and status use EPICS PV Access (p4p). +Bulk frame retrieval uses ZMQ for efficient transfer over the network. + +Typical usage:: + + client = Client() + client.set_exposure_mode(ExposureModes.GLOBAL_SHUTTER_CDS) + client.set_integration_time(6.0) + + # One-shot: start DAQ, grab 100 frames, stop DAQ + frames = client.acquire_frames(100) + print(frames.shape) # (100, 1024, 1024) + + # Or manual control + client.start_daq() + frames = client.get_frames(50) + client.stop_daq() + + client.close() """ import enum +import json +import time import logging -import threading from typing import Any, Callable import numpy as np +import zmq from p4p.client.thread import Context, Subscription logger = logging.getLogger(__name__) +DEFAULT_PV_PREFIX = "TEAM1K:" +DEFAULT_ZMQ_PORT = 42005 + class ExposureModes(enum.IntEnum): """Detector exposure modes.""" @@ -34,28 +55,33 @@ class TriggerModes(enum.IntEnum): class CommandError(Exception): - """Raised when a PVA command put fails.""" + """Raised when a PVA command put or ZMQ transfer fails.""" pass class Client: """ - PVA client for the Team1k detector server. + Client for the Team1k detector server. - All communication uses EPICS PV Access: - - Commands: pvput to {prefix}CMD:* PVs - - Status: pvget from {prefix}* status PVs - - Data: monitor on {prefix}IMAGE PV + Uses PVA for commands/status and ZMQ for bulk frame data transfer. Args: prefix: PV name prefix (default: "TEAM1K:"). + data_host: Hostname/IP of the server's ZMQ data port. + data_port: ZMQ port for frame data transfer. timeout: Default timeout in seconds for PVA operations. """ - def __init__(self, prefix: str = "TEAM1K:", timeout: float = 5.0): + def __init__(self, prefix: str = DEFAULT_PV_PREFIX, + data_host: str = "localhost", + data_port: int = DEFAULT_ZMQ_PORT, + timeout: float = 5.0): self._prefix = prefix + self._data_host = data_host + self._data_port = data_port self._timeout = timeout self._ctx = Context("pva") + self._zmq_ctx = zmq.Context() self._subscriptions: dict[str, Subscription] = {} def _pv(self, name: str) -> str: @@ -74,10 +100,115 @@ class Client: """Get the current value of a PV.""" pv_name = self._pv(pv_suffix) try: - return self._ctx.get(pv_name, timeout=self._timeout) + result = self._ctx.get(pv_name, timeout=self._timeout) + return result.value if hasattr(result, 'value') else result except Exception as e: raise CommandError(f"Failed to get {pv_name}: {e}") from e + # --- Frame retrieval (the primary data interface) --- + + def get_frames(self, num_frames: int, timeout: float = 120.0) -> np.ndarray: + """ + Capture frames from the running DAQ and transfer them. + + DAQ must already be running. This triggers a server-side capture of + N consecutive frames, waits for completion, then fetches the data + over ZMQ. + + Args: + num_frames: Number of frames to capture. + timeout: Maximum time to wait for capture + transfer (seconds). + + Returns: + numpy array of shape (num_frames, ny, nx) with dtype uint16. + + Raises: + CommandError: If capture fails, times out, or transfer errors. + """ + # Trigger server-side capture + self._put("CMD:CAPTURE", num_frames) + + # Poll capture status until READY + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + status = self._get("CAPTURE:STATUS") + if status == "READY": + break + if status == "ERROR": + raise CommandError("Server-side capture failed") + time.sleep(0.1) + else: + raise CommandError( + f"Capture timed out after {timeout}s " + f"(status={self._get('CAPTURE:STATUS')})" + ) + + # Fetch data over ZMQ + remaining = deadline - time.monotonic() + return self._fetch_zmq(timeout=max(remaining, 5.0)) + + def acquire_frames(self, num_frames: int, timeout: float = 120.0) -> np.ndarray: + """ + Start DAQ, capture N frames, stop DAQ, and return the data. + + Convenience method for one-shot acquisitions. Equivalent to:: + + client.start_daq() + frames = client.get_frames(num_frames) + client.stop_daq() + + Args: + num_frames: Number of frames to capture. + timeout: Maximum time to wait (seconds). + + Returns: + numpy array of shape (num_frames, ny, nx) with dtype uint16. + """ + self.start_daq() + try: + return self.get_frames(num_frames, timeout=timeout) + finally: + self.stop_daq() + + def _fetch_zmq(self, timeout: float = 60.0) -> np.ndarray: + """Fetch captured frame data from the server via ZMQ.""" + sock = self._zmq_ctx.socket(zmq.REQ) + sock.setsockopt(zmq.RCVTIMEO, int(timeout * 1000)) + sock.setsockopt(zmq.LINGER, 0) + sock.connect(f"tcp://{self._data_host}:{self._data_port}") + + try: + sock.send(b"FETCH") + parts = sock.recv_multipart() + + if len(parts) != 2: + raise CommandError(f"Unexpected ZMQ response: {len(parts)} parts") + + header = json.loads(parts[0]) + if "error" in header: + raise CommandError(f"Server error: {header['error']}") + + data = parts[1] + n = header["num_frames"] + ny = header["ny"] + nx = header["nx"] + dtype = np.dtype(header.get("dtype", "uint16")) + + expected_bytes = n * ny * nx * dtype.itemsize + if len(data) != expected_bytes: + raise CommandError( + f"Data size mismatch: got {len(data)}, " + f"expected {expected_bytes}" + ) + + arr = np.frombuffer(data, dtype=dtype).copy() + return arr.reshape(n, ny, nx) + + except zmq.Again: + raise CommandError(f"ZMQ fetch timed out after {timeout}s") + finally: + sock.close() + # --- Detector commands --- def set_exposure_mode(self, mode: ExposureModes | int) -> None: @@ -100,18 +231,6 @@ class Client: """Stop data acquisition.""" self._put("CMD:START_DAQ", 0) - def set_file_writing(self, enable: bool) -> None: - """Enable or disable file writing.""" - self._put("CMD:FILE_WRITING", 1 if enable else 0) - - def set_output_dir(self, path: str) -> None: - """Set the output directory for file writing.""" - self._put("CMD:OUTPUT_DIR", path) - - def set_file_format(self, fmt: str) -> None: - """Set file format: 'raw' or 'hdf5'.""" - self._put("CMD:FILE_FORMAT", fmt) - def set_adc_clock_freq(self, freq_mhz: float) -> None: """Set ADC clock frequency in MHz (50-100).""" self._put("CMD:ADC_CLOCK_FREQ", freq_mhz) @@ -155,74 +274,61 @@ class Client: def get_status(self) -> list[str]: """Get server status strings.""" val = self._get("STATUS") - return list(val.value) if hasattr(val, 'value') else list(val) + return list(val) if not isinstance(val, str) else [val] def is_acquiring(self) -> bool: """Check if DAQ is running.""" - val = self._get("ACQUIRING") - return bool(val.value if hasattr(val, 'value') else val) + return bool(self._get("ACQUIRING")) def get_frame_rate(self) -> float: """Get current frame rate in Hz.""" - val = self._get("FRAME_RATE") - return float(val.value if hasattr(val, 'value') else val) + return float(self._get("FRAME_RATE")) def get_frame_count(self) -> int: """Get total frames acquired.""" - val = self._get("FRAME_COUNT") - return int(val.value if hasattr(val, 'value') else val) + return int(self._get("FRAME_COUNT")) def get_exposure_mode(self) -> int: """Get current exposure mode.""" - val = self._get("EXPOSURE_MODE") - return int(val.value if hasattr(val, 'value') else val) + return int(self._get("EXPOSURE_MODE")) def get_trigger_mode(self) -> int: """Get current trigger mode.""" - val = self._get("TRIGGER_MODE") - return int(val.value if hasattr(val, 'value') else val) + return int(self._get("TRIGGER_MODE")) def get_integration_time(self) -> float: """Get current integration time in ms.""" - val = self._get("INTEGRATION_TIME") - return float(val.value if hasattr(val, 'value') else val) + return float(self._get("INTEGRATION_TIME")) - def is_file_writing(self) -> bool: - """Check if file writing is enabled.""" - val = self._get("FILE_WRITING") - return bool(val.value if hasattr(val, 'value') else val) + def get_capture_status(self) -> str: + """Get capture status (IDLE, CAPTURING, READY, ERROR).""" + return str(self._get("CAPTURE:STATUS")) - def get_output_dir(self) -> str: - """Get current output directory.""" - val = self._get("OUTPUT_DIR") - return str(val.value if hasattr(val, 'value') else val) - - def get_scan_number(self) -> int: - """Get current scan number.""" - val = self._get("SCAN_NUMBER") - return int(val.value if hasattr(val, 'value') else val) + def get_capture_progress(self) -> tuple[int, int]: + """Get capture progress as (captured, total).""" + progress = int(self._get("CAPTURE:PROGRESS")) + total = int(self._get("CAPTURE:TOTAL")) + return progress, total def get_bellow_position(self) -> int: """Get bellow stage position.""" - val = self._get("BELLOW:POSITION") - return int(val.value if hasattr(val, 'value') else val) + return int(self._get("BELLOW:POSITION")) def get_power_voltages(self) -> np.ndarray: """Get power supply voltage readings.""" - val = self._get("POWER:VOLTAGES") - return np.asarray(val.value if hasattr(val, 'value') else val) + return np.asarray(self._get("POWER:VOLTAGES")) # --- Monitors (live subscriptions) --- - def monitor_image(self, callback: Callable[[np.ndarray, int], None]) -> None: + def monitor_image(self, callback: Callable[[np.ndarray], None]) -> None: """ Subscribe to live image updates. - The callback receives (image_array, frame_count) for each new frame. + The callback receives a 2D numpy array for each new frame. Only one image monitor can be active at a time. Args: - callback: Function called with (numpy array, frame_count) for each frame. + callback: Function called with numpy array for each frame. """ self.stop_monitor("IMAGE") @@ -231,12 +337,10 @@ class Client: def _on_image(value): try: arr = value.value - # NTNDArray stores shape info in the dimension field if hasattr(value, 'dimension') and value.dimension: - dims = value.dimension - shape = tuple(d.size for d in dims) + shape = tuple(d.size for d in value.dimension) arr = arr.reshape(shape) - callback(arr, 0) + callback(arr) except Exception as e: logger.debug("Image monitor error: %s", e) @@ -277,20 +381,25 @@ class Client: self._subscriptions.clear() def close(self) -> None: - """Close the client and release resources.""" + """Close the client and release all resources.""" self.stop_all_monitors() self._ctx.close() + self._zmq_ctx.term() # Example usage if __name__ == "__main__": logging.basicConfig(level=logging.INFO) - client = Client() + client = Client(data_host="detector-machine") try: print("Status:", client.get_status()) - print("Acquiring:", client.is_acquiring()) - print("Frame rate:", client.get_frame_rate()) + + # One-shot acquisition of 10 frames + client.set_exposure_mode(ExposureModes.GLOBAL_SHUTTER_CDS) + client.set_integration_time(6.0) + frames = client.acquire_frames(10) + print(f"Got {frames.shape[0]} frames, shape={frames.shape}") except CommandError as e: print(f"Error: {e}") finally: diff --git a/src/team1k/filewriter/__init__.py b/src/team1k/filewriter/__init__.py index 5360f82..85422d5 100644 --- a/src/team1k/filewriter/__init__.py +++ b/src/team1k/filewriter/__init__.py @@ -1 +1 @@ -from .writer import FileWriter +from .capture import FrameCapture, DataTransferServer diff --git a/src/team1k/filewriter/capture.py b/src/team1k/filewriter/capture.py new file mode 100644 index 0000000..4b11e6f --- /dev/null +++ b/src/team1k/filewriter/capture.py @@ -0,0 +1,264 @@ +""" +On-demand frame capture and ZMQ data transfer. + +Instead of continuous local file writing, this module captures a requested +number of frames from the shared memory ring buffer into a temp file, then +serves them to remote clients via ZMQ. This is designed for the common case +where the server runs on a dedicated detector machine and clients need to +retrieve data over a (slower) network. + +Flow: + 1. Client triggers capture via PVA: pvput TEAM1K:CMD:CAPTURE + 2. FrameCapture thread reads N frames from shmring → temp file + 3. Client polls TEAM1K:CAPTURE:STATUS via PVA until "READY" + 4. Client fetches data via ZMQ: sends b"FETCH", receives [header, data] +""" + +import json +import logging +import tempfile +import threading +from pathlib import Path + +import numpy as np +import zmq + +from ..detector.chip_config import ChipConfig, TEAM1K_CONFIG + +logger = logging.getLogger(__name__) + +DEFAULT_ZMQ_PORT = 42005 + + +class FrameCapture(threading.Thread): + """ + Captures frames from shmring on demand. + + When triggered via capture(), reads N consecutive frames from the ring + buffer and writes them to a temp file. Status is tracked and exposed + for the PVA interface. + """ + + def __init__(self, ring_name: str, chip_config: ChipConfig = TEAM1K_CONFIG): + super().__init__(daemon=True, name="team1k-capture") + self._ring_name = ring_name + self._config = chip_config + self._shutdown = threading.Event() + + # Capture state + self._capture_event = threading.Event() + self._capture_done = threading.Event() + self._num_requested = 0 + self._num_captured = 0 + self._status = "IDLE" + self._error_msg = "" + self._temp_path: str | None = None + self._data_lock = threading.Lock() + + @property + def status(self) -> str: + return self._status + + @property + def num_captured(self) -> int: + return self._num_captured + + @property + def num_requested(self) -> int: + return self._num_requested + + @property + def error_message(self) -> str: + return self._error_msg + + def capture(self, num_frames: int) -> None: + """Request capture of N frames from the ring buffer.""" + self._num_requested = num_frames + self._num_captured = 0 + self._status = "CAPTURING" + self._error_msg = "" + self._capture_done.clear() + self._capture_event.set() + logger.info("Capture requested: %d frames", num_frames) + + def wait_complete(self, timeout: float | None = None) -> bool: + """Wait for current capture to complete. Returns True if complete.""" + return self._capture_done.wait(timeout) + + def get_data(self) -> tuple[dict, bytes] | None: + """ + Read captured data. Returns (header_dict, raw_bytes) or None. + + The caller should hold this briefly — a new capture will block + until this returns. + """ + with self._data_lock: + if self._status != "READY" or not self._temp_path: + return None + header = { + "num_frames": self._num_captured, + "nx": self._config.image_nx, + "ny": self._config.image_ny, + "dtype": "uint16", + } + try: + data = Path(self._temp_path).read_bytes() + except Exception as e: + logger.error("Failed to read capture file: %s", e) + return None + return header, data + + def run(self) -> None: + """Main thread loop: wait for capture requests.""" + from locking_shmring import RingBufferReader + + reader = RingBufferReader(self._ring_name) + logger.info("Frame capture connected to ring buffer '%s'", self._ring_name) + + try: + while not self._shutdown.is_set(): + if not self._capture_event.wait(0.2): + continue + self._capture_event.clear() + self._do_capture(reader) + except Exception as e: + logger.error("Frame capture thread error: %s", e) + finally: + reader.close() + + def _do_capture(self, reader) -> None: + """Capture frames to a temp file.""" + # Clean up previous temp file (under lock so concurrent reads finish) + with self._data_lock: + if self._temp_path: + try: + Path(self._temp_path).unlink(missing_ok=True) + except Exception: + pass + self._temp_path = None + + try: + fd, temp_path = tempfile.mkstemp(suffix='.raw', prefix='team1k_capture_') + with open(fd, 'wb') as f: + for slot in reader.iter_frames(timeout_ms=5000): + if self._shutdown.is_set(): + self._status = "ERROR" + self._error_msg = "Shutdown during capture" + self._capture_done.set() + return + + with slot: + frame = slot.data_as_numpy( + (self._config.image_ny, self._config.image_nx), + np.uint16, + ) + f.write(frame.tobytes()) + self._num_captured += 1 + + if self._num_captured >= self._num_requested: + break + + if self._num_captured < self._num_requested: + self._status = "ERROR" + self._error_msg = ( + f"Only captured {self._num_captured}/{self._num_requested} " + f"frames (ring timeout)" + ) + logger.error(self._error_msg) + else: + with self._data_lock: + self._temp_path = temp_path + self._status = "READY" + logger.info("Capture complete: %d frames (%s)", + self._num_captured, temp_path) + + except Exception as e: + self._status = "ERROR" + self._error_msg = str(e) + logger.error("Capture error: %s", e) + + self._capture_done.set() + + def stop(self) -> None: + """Stop the capture thread and clean up.""" + self._shutdown.set() + if self._temp_path: + try: + Path(self._temp_path).unlink(missing_ok=True) + except Exception: + pass + + +class DataTransferServer(threading.Thread): + """ + ZMQ server for transferring captured frame data to clients. + + Runs a REP socket. Supports two request types: + b"FETCH" — returns [header_json, frame_data_bytes] + b"STATUS" — returns JSON with capture status + """ + + def __init__(self, capture: FrameCapture, chip_config: ChipConfig = TEAM1K_CONFIG, + port: int = DEFAULT_ZMQ_PORT): + super().__init__(daemon=True, name="team1k-data-transfer") + self._capture = capture + self._config = chip_config + self._port = port + self._shutdown = threading.Event() + + def run(self) -> None: + ctx = zmq.Context() + sock = ctx.socket(zmq.REP) + sock.bind(f"tcp://*:{self._port}") + sock.setsockopt(zmq.RCVTIMEO, 200) + logger.info("Data transfer server listening on tcp://*:%d", self._port) + + try: + while not self._shutdown.is_set(): + try: + msg = sock.recv() + except zmq.Again: + continue + self._handle(sock, msg) + finally: + sock.close() + ctx.term() + + def _handle(self, sock: zmq.Socket, msg: bytes) -> None: + try: + request = msg.decode('utf-8').strip() + except UnicodeDecodeError: + sock.send(json.dumps({"error": "Invalid request"}).encode()) + return + + if request == "FETCH": + self._handle_fetch(sock) + elif request == "STATUS": + sock.send(json.dumps({ + "status": self._capture.status, + "num_captured": self._capture.num_captured, + "num_requested": self._capture.num_requested, + "error": self._capture.error_message, + }).encode()) + else: + sock.send(json.dumps({"error": f"Unknown request: {request}"}).encode()) + + def _handle_fetch(self, sock: zmq.Socket) -> None: + result = self._capture.get_data() + if result is None: + sock.send_multipart([ + json.dumps({ + "error": "No data available", + "status": self._capture.status, + }).encode(), + b"", + ]) + return + + header, data = result + sock.send_multipart([json.dumps(header).encode(), data]) + logger.info("Sent %d frames (%d bytes) to client", + header["num_frames"], len(data)) + + def stop(self) -> None: + self._shutdown.set() diff --git a/src/team1k/filewriter/writer.py b/src/team1k/filewriter/writer.py deleted file mode 100644 index 21296ea..0000000 --- a/src/team1k/filewriter/writer.py +++ /dev/null @@ -1,317 +0,0 @@ -""" -File writing from shared memory ring buffer. - -Ports KTestFunctions::FileWritingThread from KTestFunctions.cxx (lines 1198-1241). -Supports both raw binary (backward compatible) and HDF5 formats. -""" - -import os -import struct -import shutil -import logging -import threading -from pathlib import Path - -import numpy as np - -from ..detector.chip_config import ChipConfig, TEAM1K_CONFIG - -logger = logging.getLogger(__name__) - -# Minimum free disk space in MB before stopping writes -_MIN_FREE_SPACE_MB = 256 - -# Scan number persistence file -_SCAN_NUMBER_FILE = "/usr/local/k_test/parameters/last_scan_number.dat" - - -class FileWriter: - """ - Reads frames from locking_shmring and writes to disk. - - Supports two formats: - - 'raw': Binary files matching the C++ k_test format (4-byte header + pixel data) - - 'hdf5': HDF5 files with chunked datasets and metadata - - Runs as a background thread in the main process. - """ - - def __init__(self, ring_name: str, chip_config: ChipConfig = TEAM1K_CONFIG): - self._ring_name = ring_name - self._config = chip_config - self._thread: threading.Thread | None = None - self._running = False - - # Configuration (can be changed while running) - self._enabled = threading.Event() - self._output_dir = "/data" - self._format = "raw" # or "hdf5" - self._scan_number = self._load_scan_number() - self._lock = threading.Lock() - - # Stats - self.frames_written: int = 0 - self.bytes_written: int = 0 - - def _load_scan_number(self) -> int: - """Load the last scan number from the persistent file.""" - try: - with open(_SCAN_NUMBER_FILE, 'rb') as f: - data = f.read(4) - if len(data) == 4: - return struct.unpack(' None: - """Save the current scan number to the persistent file.""" - try: - os.makedirs(os.path.dirname(_SCAN_NUMBER_FILE), exist_ok=True) - with open(_SCAN_NUMBER_FILE, 'wb') as f: - f.write(struct.pack(' int: - return self._scan_number - - @property - def output_dir(self) -> str: - return self._output_dir - - @property - def file_format(self) -> str: - return self._format - - @property - def is_enabled(self) -> bool: - return self._enabled.is_set() - - def enable(self, output_dir: str | None = None, file_format: str | None = None) -> None: - """ - Enable file writing. - - Args: - output_dir: Output directory (default: keep current). - file_format: File format, 'raw' or 'hdf5' (default: keep current). - """ - with self._lock: - if output_dir is not None: - self._output_dir = output_dir - if file_format is not None: - if file_format not in ('raw', 'hdf5'): - raise ValueError(f"Unsupported format: {file_format}") - self._format = file_format - - # Increment scan number - self._scan_number += 1 - self._save_scan_number() - - # Create scan directory - scan_dir = os.path.join(self._output_dir, - f"scan_{self._scan_number:010d}") - os.makedirs(scan_dir, exist_ok=True) - - self._enabled.set() - logger.info("File writing enabled: dir=%s, format=%s, scan=%d", - self._output_dir, self._format, self._scan_number) - - def disable(self) -> None: - """Disable file writing.""" - self._enabled.clear() - logger.info("File writing disabled") - - def start(self) -> None: - """Start the writer thread.""" - if self._thread and self._thread.is_alive(): - return - self._running = True - self._thread = threading.Thread(target=self._writer_loop, daemon=True, - name="team1k-filewriter") - self._thread.start() - logger.info("File writer thread started") - - def stop(self) -> None: - """Stop the writer thread.""" - self._running = False - self._enabled.clear() - if self._thread: - self._thread.join(timeout=5.0) - logger.info("File writer thread stopped") - - def _check_disk_space(self) -> bool: - """Check for sufficient free disk space.""" - try: - stat = shutil.disk_usage(self._output_dir) - free_mb = stat.free / (1024 * 1024) - if free_mb < _MIN_FREE_SPACE_MB: - logger.error("Disk almost full (%.0f MB free), stopping writes", free_mb) - return False - return True - except Exception as e: - logger.error("Could not check disk space: %s", e) - return False - - def _writer_loop(self) -> None: - """Main writer loop reading from shmring.""" - from locking_shmring import RingBufferReader - - reader = RingBufferReader(self._ring_name) - logger.info("File writer connected to ring buffer '%s'", self._ring_name) - - raw_fp = None - h5_file = None - h5_dataset = None - file_number = 0 - transfers_in_file = 0 - ntransfers_per_file = self._config.ntransfers_per_file - current_scan = 0 - - try: - for slot in reader.iter_frames(timeout_ms=200): - if not self._running: - slot.release() - break - - with slot: - if not self._enabled.is_set(): - # Close any open files - if raw_fp: - raw_fp.close() - raw_fp = None - if h5_file: - h5_file.close() - h5_file = None - h5_dataset = None - file_number = 0 - transfers_in_file = 0 - continue - - # Check if scan number changed (new enable call) - with self._lock: - scan = self._scan_number - fmt = self._format - out_dir = self._output_dir - - if scan != current_scan: - # Close old files - if raw_fp: - raw_fp.close() - raw_fp = None - if h5_file: - h5_file.close() - h5_file = None - h5_dataset = None - file_number = 0 - transfers_in_file = 0 - current_scan = scan - - # Check disk space periodically - if transfers_in_file == 0 and not self._check_disk_space(): - self._enabled.clear() - continue - - frame = slot.data_as_numpy( - (self._config.image_ny, self._config.image_nx), np.uint16 - ) - - if fmt == "raw": - raw_fp, file_number, transfers_in_file = self._write_raw( - frame, slot.frame_id, raw_fp, out_dir, scan, - file_number, transfers_in_file, ntransfers_per_file - ) - elif fmt == "hdf5": - h5_file, h5_dataset = self._write_hdf5( - frame, slot.frame_id, slot.timestamp_ns, - h5_file, h5_dataset, out_dir, scan, file_number - ) - - self.frames_written += 1 - self.bytes_written += self._config.image_size_bytes - - except Exception as e: - logger.error("File writer error: %s", e) - finally: - if raw_fp: - raw_fp.close() - if h5_file: - h5_file.close() - reader.close() - - def _write_raw(self, frame: np.ndarray, frame_id: int, - fp, out_dir: str, scan: int, - file_number: int, transfers_in_file: int, - ntransfers_per_file: int): - """Write a frame in raw binary format (backward compatible with C++ k_test).""" - if transfers_in_file == 0: - if fp: - fp.close() - filename = os.path.join( - out_dir, - f"scan_{scan:010d}", - f"team1k_ued_{scan:010d}_{file_number:010d}.dat" - ) - fp = open(filename, 'wb') - # Write first image number header - fp.write(struct.pack('= ntransfers_per_file: - transfers_in_file = 0 - file_number += 1 - - return fp, file_number, transfers_in_file - - def _write_hdf5(self, frame: np.ndarray, frame_id: int, timestamp_ns: int, - h5_file, h5_dataset, out_dir: str, scan: int, file_number: int): - """Write a frame in HDF5 format.""" - try: - import h5py - except ImportError: - logger.error("h5py not installed, cannot write HDF5 files") - self._enabled.clear() - return None, None - - if h5_file is None: - filename = os.path.join( - out_dir, - f"scan_{scan:010d}", - f"team1k_ued_{scan:010d}_{file_number:010d}.h5" - ) - h5_file = h5py.File(filename, 'w') - h5_dataset = h5_file.create_dataset( - 'data', - shape=(0, self._config.image_ny, self._config.image_nx), - maxshape=(None, self._config.image_ny, self._config.image_nx), - dtype=np.uint16, - chunks=(1, self._config.image_ny, self._config.image_nx), - ) - h5_file.create_dataset('timestamps', shape=(0,), maxshape=(None,), - dtype=np.float64) - h5_file.create_dataset('frame_ids', shape=(0,), maxshape=(None,), - dtype=np.uint64) - h5_file.attrs['scan_number'] = scan - - # Append frame - n = h5_dataset.shape[0] - h5_dataset.resize(n + 1, axis=0) - h5_dataset[n] = frame - - ts_ds = h5_file['timestamps'] - ts_ds.resize(n + 1, axis=0) - ts_ds[n] = timestamp_ns / 1e9 - - fid_ds = h5_file['frame_ids'] - fid_ds.resize(n + 1, axis=0) - fid_ds[n] = frame_id - - h5_file.flush() - - return h5_file, h5_dataset diff --git a/src/team1k/pva/interface.py b/src/team1k/pva/interface.py index de6deca..0ce500e 100644 --- a/src/team1k/pva/interface.py +++ b/src/team1k/pva/interface.py @@ -69,25 +69,21 @@ class PVAInterface: {prefix}EXPOSURE_MODE - int, current exposure mode {prefix}TRIGGER_MODE - int, current trigger mode {prefix}INTEGRATION_TIME - float, integration time ms - {prefix}FILE_WRITING - bool, file writing enabled - {prefix}OUTPUT_DIR - string, output directory - {prefix}SCAN_NUMBER - int, current scan number + {prefix}CAPTURE:STATUS - string, capture state (IDLE/CAPTURING/READY/ERROR) + {prefix}CAPTURE:PROGRESS - int, frames captured so far + {prefix}CAPTURE:TOTAL - int, frames requested Commands (writable): {prefix}CMD:EXPOSURE_MODE - int, set exposure mode {prefix}CMD:TRIGGER_MODE - int, set trigger mode {prefix}CMD:INTEGRATION_TIME - float, set integration time ms {prefix}CMD:START_DAQ - int, 1=start, 0=stop - {prefix}CMD:FILE_WRITING - int, 1=enable, 0=disable - {prefix}CMD:OUTPUT_DIR - string, set output directory - {prefix}CMD:FILE_FORMAT - string, "raw" or "hdf5" + {prefix}CMD:CAPTURE - int, capture N frames {prefix}CMD:ADC_CLOCK_FREQ - float, set ADC clock MHz {prefix}CMD:ADC_DATA_DELAY - int, set ADC data delay {prefix}CMD:PARAMETER_FILE - string, apply parameter file {prefix}CMD:RESET - int, 1=reset detector connection {prefix}CMD:TEST_MODE - int, 0/1 - {prefix}CMD:REGISTER_READ - int, read register at address - {prefix}CMD:REGISTER_WRITE - int array [address, value] Data: {prefix}IMAGE - NTNDArray, live image stream @@ -140,12 +136,12 @@ class PVAInterface: NTScalar('i'), 0) self._pvs[f"{p}INTEGRATION_TIME"] = self._make_status_pv( NTScalar('d'), 0.0) - self._pvs[f"{p}FILE_WRITING"] = self._make_status_pv( - NTScalar('?'), False) - self._pvs[f"{p}OUTPUT_DIR"] = self._make_status_pv( - NTScalar('s'), "/data") - self._pvs[f"{p}SCAN_NUMBER"] = self._make_status_pv( - NTScalar('l'), 0) + self._pvs[f"{p}CAPTURE:STATUS"] = self._make_status_pv( + NTScalar('s'), "IDLE") + self._pvs[f"{p}CAPTURE:PROGRESS"] = self._make_status_pv( + NTScalar('i'), 0) + self._pvs[f"{p}CAPTURE:TOTAL"] = self._make_status_pv( + NTScalar('i'), 0) # --- Command PVs (writable) --- self._pvs[f"{p}CMD:EXPOSURE_MODE"] = self._make_command_pv( @@ -156,12 +152,8 @@ class PVAInterface: NTScalar('d'), 0.0, "set_integration_time") self._pvs[f"{p}CMD:START_DAQ"] = self._make_command_pv( NTScalar('i'), 0, "start_stop_daq") - self._pvs[f"{p}CMD:FILE_WRITING"] = self._make_command_pv( - NTScalar('i'), 0, "set_file_writing") - self._pvs[f"{p}CMD:OUTPUT_DIR"] = self._make_command_pv( - NTScalar('s'), "", "set_output_dir") - self._pvs[f"{p}CMD:FILE_FORMAT"] = self._make_command_pv( - NTScalar('s'), "raw", "set_file_format") + self._pvs[f"{p}CMD:CAPTURE"] = self._make_command_pv( + NTScalar('i'), 0, "capture_frames") self._pvs[f"{p}CMD:ADC_CLOCK_FREQ"] = self._make_command_pv( NTScalar('d'), 60.0, "set_adc_clock_freq") self._pvs[f"{p}CMD:ADC_DATA_DELAY"] = self._make_command_pv( diff --git a/src/team1k/server.py b/src/team1k/server.py index eac9379..9a22d8b 100644 --- a/src/team1k/server.py +++ b/src/team1k/server.py @@ -11,7 +11,8 @@ Architecture: - Detector register read/write (UDP port 42000) - PVA server (commands, status, data) - PVA streamer thread (shmring -> NTNDArray) - - File writer thread (shmring -> raw/HDF5 files) + - Frame capture thread (shmring -> temp file, on demand) + - ZMQ data transfer server (temp file -> client) - Peripheral control (bellow stage, power supply) Acquisition subprocess: @@ -35,7 +36,7 @@ from .detector.chip_config import TEAM1K_CONFIG, configure_chip from .detector.parameter_file import apply_parameter_file from .acquisition.receiver import AcquisitionProcess -from .filewriter.writer import FileWriter +from .filewriter.capture import FrameCapture, DataTransferServer from .pva.interface import PVAInterface from .pva.streamer import PVAStreamer from .peripherals.bellow_stage import BellowStage @@ -49,6 +50,7 @@ DEFAULT_REGISTER_PORT = 42000 DEFAULT_DATA_PORT = 41000 DEFAULT_PV_PREFIX = "TEAM1K:" DEFAULT_BELLOW_PORT = "/dev/CameraBellowStage" +DEFAULT_ZMQ_PORT = 42005 class Team1kServer: @@ -56,7 +58,8 @@ class Team1kServer: Main server coordinating all subsystems. Manages direct UDP communication with the detector, data acquisition - via a dedicated subprocess, PVA streaming, and optional file writing. + via a dedicated subprocess, PVA streaming, and on-demand frame capture + with ZMQ data transfer to remote clients. """ def __init__(self, detector_ip: str = DEFAULT_DETECTOR_IP, @@ -64,7 +67,8 @@ class Team1kServer: data_port: int = DEFAULT_DATA_PORT, pv_prefix: str = DEFAULT_PV_PREFIX, config_file: str | None = None, - bellow_port: str = DEFAULT_BELLOW_PORT): + bellow_port: str = DEFAULT_BELLOW_PORT, + zmq_port: int = DEFAULT_ZMQ_PORT): self.detector_ip = detector_ip self.register_port = register_port @@ -90,8 +94,11 @@ class Team1kServer: # PVA interface self.pva = PVAInterface(self, prefix=pv_prefix) - # File writer (thread in main process) - self.file_writer = FileWriter("team1k_frames", self.chip_config) + # Frame capture + ZMQ data transfer (replaces local file writing) + self.frame_capture = FrameCapture("team1k_frames", self.chip_config) + self.data_server = DataTransferServer( + self.frame_capture, self.chip_config, port=zmq_port, + ) # PVA streamer will be created after PVA setup self._pva_streamer: PVAStreamer | None = None @@ -173,24 +180,14 @@ class Team1kServer: else: self._stop_daq() - elif command == "set_file_writing": - if int(value): - self.file_writer.enable() - self.pva.post_status("FILE_WRITING", True) - self.pva.post_status("SCAN_NUMBER", self.file_writer.scan_number) - else: - self.file_writer.disable() - self.pva.post_status("FILE_WRITING", False) - - elif command == "set_output_dir": - self.file_writer._output_dir = str(value) - self.pva.post_status("OUTPUT_DIR", str(value)) - - elif command == "set_file_format": - fmt = str(value) - if fmt not in ("raw", "hdf5"): - raise ValueError(f"Unsupported format: {fmt}") - self.file_writer._format = fmt + elif command == "capture_frames": + num = int(value) + if num <= 0: + raise ValueError(f"Invalid frame count: {num}") + self.frame_capture.capture(num) + self.pva.post_status("CAPTURE:STATUS", "CAPTURING") + self.pva.post_status("CAPTURE:TOTAL", num) + self.pva.post_status("CAPTURE:PROGRESS", 0) elif command == "set_adc_clock_freq": self.adc.set_clock_freq(float(value)) @@ -293,9 +290,8 @@ class Team1kServer: self.pva.post_status("FRAME_RATE", self._pva_streamer.frame_rate) self.pva.post_status("FRAME_COUNT", self._pva_streamer.frame_count) - self.pva.post_status("FILE_WRITING", self.file_writer.is_enabled) - self.pva.post_status("SCAN_NUMBER", self.file_writer.scan_number) - self.pva.post_status("OUTPUT_DIR", self.file_writer.output_dir) + self.pva.post_status("CAPTURE:STATUS", self.frame_capture.status) + self.pva.post_status("CAPTURE:PROGRESS", self.frame_capture.num_captured) except Exception as e: logger.debug("Status update error: %s", e) @@ -329,8 +325,9 @@ class Team1kServer: ) self._pva_streamer.start() - # Start file writer thread - self.file_writer.start() + # Start frame capture + ZMQ data transfer threads + self.frame_capture.start() + self.data_server.start() # Start status update thread status_thread = threading.Thread(target=self._status_update_loop, @@ -370,8 +367,9 @@ class Team1kServer: if self._pva_streamer: self._pva_streamer.stop() - # Stop file writer - self.file_writer.stop() + # Stop frame capture + data transfer + self.frame_capture.stop() + self.data_server.stop() # Shutdown acquisition subprocess self.acquisition.shutdown() @@ -400,6 +398,8 @@ def main(): help="Parameter file to apply on startup") parser.add_argument('--bellow-port', default=DEFAULT_BELLOW_PORT, help=f"Bellow stage serial port (default: {DEFAULT_BELLOW_PORT})") + parser.add_argument('--zmq-port', type=int, default=DEFAULT_ZMQ_PORT, + help=f"ZMQ data transfer port (default: {DEFAULT_ZMQ_PORT})") parser.add_argument('--log-level', default='INFO', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], help="Logging level (default: INFO)") @@ -417,6 +417,7 @@ def main(): pv_prefix=args.pv_prefix, config_file=args.config, bellow_port=args.bellow_port, + zmq_port=args.zmq_port, ) try: