diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e482470 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +*.egg-info/ \ No newline at end of file diff --git a/README.md b/README.md index 3ec3123..56028f6 100644 --- a/README.md +++ b/README.md @@ -2,22 +2,24 @@ Python control library and server for the TEAM1k X-ray detector. -Communicates directly with the detector FPGA via UDP, replacing the previous `k_test` C++ program. Uses EPICS PV Access (PVA) for command/status and ZMQ for bulk data transfer. +Communicates directly with the detector FPGA via UDP. Uses EPICS PV Access (PVA) for the image stream and basic controls (areaDetector-style), and a TCP protocol for the Python client. ## Architecture ``` Client machine Detector machine +--------------+ +-------------------------------+ -| | PVA (commands/status) | team1k-server | -| Client |<---------------------->| Register I/O (UDP:42000) | -| | | PVA server | -| | ZMQ (frame data) | PVA streamer thread | -| |<---------------------->| Frame capture thread | -| | | ZMQ data transfer thread | -+--------------+ | | - | Acquisition subprocess | - | UDP data recv (UDP:41000) | +| | TCP (control+data) | team1k-server | +| Client |<--------------------->| Register I/O (UDP:42000) | +| (Python) | | TCP client server | +| | | Buffered frame capture | ++--------------+ | PVA server (areaDetector) | + | PVA streamer (IMAGE) | +EPICS clients | Power supply monitoring | ++--------------+ | Auto-reconnect | +| pvget/CSS/ | PVA (IMAGE stream) | | +| other tools |<--------------------->| Acquisition subprocess | ++--------------+ | UDP data recv (UDP:41000) | | Frame assembly | | -> locking_shmring | +-------------------------------+ @@ -29,154 +31,202 @@ Client machine Detector machine pip install . ``` -For HDF5 support (optional): +For notebook progress bars (optional): ```bash -pip install ".[hdf5]" +pip install ".[client]" ``` ### Dependencies - `locking_shmring` — shared memory ring buffer (must be installed separately) - `p4p` — EPICS PV Access for Python -- `pyzmq` — ZeroMQ for bulk data transfer +- `pyyaml` — YAML config parsing +- `pyvisa`, `pyvisa-py` — power supply control - `numpy`, `pyserial` +## Configuration + +Create a YAML config file: + +```yaml +detector: + ip: "10.0.0.32" + register_port: 42000 + data_port: 41000 + auto_reconnect: true + reconnect_interval: 5.0 + +adc: + clock_frequency_mhz: 50 + data_delay: 0x180 + order_7to0: 0x89abcdef + order_15to8: 0x01234567 + +digital_signals: + order_7to0: 0x24FFFFFF + order_15to8: 0xF5F310FF + polarity: 0x0014 + +defaults: + exposure_mode: 3 + trigger_mode: external + trigger_polarity: rising + integration_time_ms: 4.0 + +server: + pv_prefix: "TEAM1K:" + client_port: 42010 + +peripherals: + bellow_port: "/dev/CameraBellowStage" + + detector_power: + port: "/dev/DetectorPowerSupply" + voltage_step: 0.2 + channels: + 1: { voltage: 5.0, current: 2.0, ovp: 6.0 } + 2: { voltage: 3.3, current: 1.0, ovp: 4.0 } + + tec: + port: "/dev/TECsPowerSupply" + voltage_step: 0.2 + channels: + 1: { voltage: 12.0, current: 3.0, ovp: 14.0 } +``` + ## Running the server ```bash -team1k-server --detector-ip 10.0.0.32 --log-level INFO +team1k-server --config config.yaml --log-level INFO ``` -All options: +Options: ``` ---detector-ip Detector IP address (default: 10.0.0.32) ---register-port Detector register port (default: 42000) ---data-port Detector data port (default: 41000) ---pv-prefix PVA prefix (default: TEAM1K:) ---zmq-port ZMQ data transfer port (default: 42005) ---config Parameter file to apply on startup ---bellow-port Bellow stage serial port (default: /dev/CameraBellowStage) +--config YAML config file path +--detector-ip Override detector IP address +--pv-prefix Override PVA prefix +--client-port Override TCP client port --log-level DEBUG, INFO, WARNING, ERROR (default: INFO) ``` +The server starts DAQ automatically after initialization (continuous mode). The PVA IMAGE stream is always live. + ## Client usage -### Acquire frames +The Python client connects via TCP. No EPICS/p4p installation needed. + +### Basic usage ```python -from team1k import Client, ExposureModes +from team1k import Client -client = Client(data_host="detector-machine") +client = Client("detector-machine") # Configure -client.set_exposure_mode(ExposureModes.GLOBAL_SHUTTER_CDS) -client.set_integration_time(6.0) # ms +client.exposure_mode = 3 # GlobalCDS +client.trigger_mode = 1 # External +client.integration_time = 4.0 # ms -# One-shot: start DAQ, capture 100 frames, stop DAQ -frames = client.acquire_frames(100) +# Status +print(client.state) # "Acquiring" +print(client.frame_rate) # 500.0 + +# Capture frames (shows progress bar) +frames = client.capture(100) print(frames.shape) # (100, 1024, 1024) print(frames.dtype) # uint16 client.close() ``` -### Manual DAQ control +### Capture modes ```python -client = Client(data_host="detector-machine") +# Fixed number of frames +frames = client.capture(num_frames=100) -client.start_daq() +# Capture for a duration +frames = client.capture(duration=5.0) -# Grab frames from the running stream -batch1 = client.get_frames(50) -batch2 = client.get_frames(50) +# Open-ended capture (start/stop) +stream = client.start_capture() +# ... do something ... +frames = stream.stop() -client.stop_daq() -client.close() +# Disable progress bar +frames = client.capture(100, progress=False) ``` -### Live image monitoring +### Peripherals ```python -client = Client() - -def on_frame(image): - print(f"Frame received: {image.shape}") - -client.monitor_image(on_frame) - -# ... later -client.stop_monitor("IMAGE") -client.close() -``` - -### Status - -```python -client = Client() - -print(client.get_status()) -print(client.is_acquiring()) -print(client.get_frame_rate()) -print(client.get_exposure_mode()) -print(client.get_integration_time()) - -client.close() -``` - -### Other commands - -```python -client.set_trigger_mode(TriggerModes.EXTERNAL) -client.set_adc_clock_freq(60.0) # MHz -client.set_adc_data_delay(0x1A7) -client.load_parameter_file("/path/to/params.txt") -client.set_test_mode(True) # FPGA test pattern -client.reset_connection() - -# Peripherals +# Bellow stage client.insert_detector() client.retract_detector() + +# Detector power client.power_on() client.power_off() +print(client.power_status) # {1: {"voltage": 5.01, "current": 1.2}, ...} + +# TEC +client.tec_on() +client.tec_off() +print(client.tec_status) +``` + +### Advanced + +```python +# Direct register access +value = client.read_register(22) +client.write_register(22, 3) + +# Reconnect after power cycle +client.power_on() +client.reconnect() + +# Full status +print(client.status()) + +# Context manager +with Client("detector-machine") as c: + frames = c.capture(100) ``` ## PV Access interface -All PVs are prefixed with `TEAM1K:` by default. +All PVs are prefixed with `TEAM1K:` by default. Follows areaDetector conventions with RW + `_RBV` readback pattern. -### Status PVs (read-only) +### Acquisition PVs -| PV | Type | Description | -|----|------|-------------| -| `STATUS` | string[] | Server status | -| `ACQUIRING` | bool | DAQ running | -| `FRAME_RATE` | float | Current frame rate (Hz) | -| `FRAME_COUNT` | int | Total frames acquired | -| `EXPOSURE_MODE` | int | Current exposure mode (0-3) | -| `TRIGGER_MODE` | int | Current trigger mode | -| `INTEGRATION_TIME` | float | Integration time (ms) | -| `CAPTURE:STATUS` | string | IDLE / CAPTURING / READY / ERROR | -| `CAPTURE:PROGRESS` | int | Frames captured so far | -| `CAPTURE:TOTAL` | int | Frames requested | -| `IMAGE` | NTNDArray | Live image stream | +| PV | Type | RW | Description | +|----|------|----|-------------| +| `Acquire` / `_RBV` | bool | RW | Start/stop DAQ | +| `AcquireTime` / `_RBV` | float | RW | Integration time (seconds) | +| `ExposureMode` / `_RBV` | enum | RW | Rolling/Rolling with Pause/Global/Global with CDS | +| `TriggerMode` / `_RBV` | enum | RW | Internal/External | +| `ArrayCounter_RBV` | int | RO | Total frames acquired | +| `FrameRate_RBV` | float | RO | Current frame rate (Hz) | +| `DetectorState_RBV` | enum | RO | Disconnected/Initializing/Idle/Acquiring/Error | +| `MaxSizeX_RBV` / `MaxSizeY_RBV` | int | RO | 1024, 1024 | +| `IMAGE` | NTNDArray | RO | Live image stream | -### Command PVs (writable) +### Peripheral PVs -| PV | Type | Description | -|----|------|-------------| -| `CMD:EXPOSURE_MODE` | int | Set exposure mode | -| `CMD:TRIGGER_MODE` | int | Set trigger mode | -| `CMD:INTEGRATION_TIME` | float | Set integration time (ms) | -| `CMD:START_DAQ` | int | 1=start, 0=stop | -| `CMD:CAPTURE` | int | Capture N frames | -| `CMD:ADC_CLOCK_FREQ` | float | Set ADC clock (MHz) | -| `CMD:ADC_DATA_DELAY` | int | Set ADC data delay | -| `CMD:PARAMETER_FILE` | string | Apply parameter file | -| `CMD:RESET` | int | 1=reset connection | -| `CMD:TEST_MODE` | int | 1=enable FPGA test data | +| PV | Type | RW | Description | +|----|------|----|-------------| +| `Bellow:Insert` | bool | RW | Write true to insert | +| `Bellow:Retract` | bool | RW | Write true to retract | +| `Bellow:Position_RBV` | int | RO | Current position | +| `Power:Enable` / `_RBV` | bool | RW | Detector power on/off | +| `Power:ChN:Voltage_RBV` | float | RO | Channel N voltage | +| `Power:ChN:Current_RBV` | float | RO | Channel N current | +| `TEC:Enable` / `_RBV` | bool | RW | TEC power on/off | +| `TEC:ChN:Voltage_RBV` | float | RO | Channel N voltage | +| `TEC:ChN:Current_RBV` | float | RO | Channel N current | ## Package structure @@ -189,17 +239,21 @@ src/team1k/ chip_config.py # Chip constants (1024x1024, packet layout) commands.py # Detector commands (exposure, trigger, etc.) adc.py # Si570 I2C clock programming - parameter_file.py # Parameter file parser acquisition/ # High-throughput data path receiver.py # Acquisition subprocess (UDP -> shmring) - filewriter/ # On-demand frame capture - capture.py # FrameCapture + ZMQ DataTransferServer - pva/ # EPICS PV Access - interface.py # PVA server, command/status PVs + pva/ # EPICS PV Access (areaDetector-style) + interface.py # PVA server, RW PVs with _RBV readbacks streamer.py # shmring -> NTNDArray live stream peripherals/ # Hardware peripherals + power_base.py # VISA power supply base class + power_supply.py # Detector power supply + tec.py # TEC controller bellow_stage.py # Camera bellow stage (serial) - power_supply.py # Power supply control + config.py # YAML configuration loader + state.py # DetectorState enum server.py # Main server entry point - Client.py # PVA + ZMQ client library + tcp_server.py # TCP client handler + tcp_protocol.py # Shared TCP message framing + capture.py # Buffered frame capture (shmring -> TCP) + client.py # Pure-TCP client library ``` diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..0f0d4d5 --- /dev/null +++ b/config.yaml @@ -0,0 +1,80 @@ +adc: + clock_frequency_mhz: 50.0 + data_delay: 384 + order_15to8: 19088743 + order_7to0: 2309737967 +config_version: 1 +defaults: + exposure_mode: 3 + integration_time_ms: 4.0 + trigger_mode: 1 + trigger_polarity: 0 +detector: + auto_reconnect: true + data_port: 41000 + ip: 10.0.0.32 + reconnect_interval: 5.0 + register_port: 42000 +digital: + order_15to8: 4126347519 + order_7to0: 620756991 + polarity: 20 +peripherals: + bellow_stage: + absolute_positioning: false + baudrate: 9600 + inserted_position_um: 0 + port: /dev/CameraBellowStage + retracted_position_um: -2000000 + speed: 100000 + detector_power: + channels: + '1': + channel: 1 + current: 0.0 + ovp: 0.0 + voltage: 0.0 + '2': + channel: 2 + current: 0.0 + ovp: 0.0 + voltage: 0.0 + '3': + channel: 3 + current: 0.0 + ovp: 0.0 + voltage: 0.0 + '4': + channel: 4 + current: 0.0 + ovp: 0.0 + voltage: 0.0 + port: '' + voltage_step: 0.2 + tec: + channels: + '1': + channel: 1 + current: 0.0 + ovp: 0.0 + voltage: 0.0 + '2': + channel: 2 + current: 0.0 + ovp: 0.0 + voltage: 0.0 + '3': + channel: 3 + current: 0.0 + ovp: 0.0 + voltage: 0.0 + '4': + channel: 4 + current: 0.0 + ovp: 0.0 + voltage: 0.0 + port: '' + voltage_step: 0.2 +server: + client_port: 42010 + pv_prefix: 'TEAM1K:' diff --git a/pyproject.toml b/pyproject.toml index 8cacec1..43b15ac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,22 +4,25 @@ build-backend = "setuptools.build_meta" [project] name = "team1k" -version = "0.0.1" +version = "0.1.0" authors = [ { name="Sebastian Strempfer", email="sstrempfer@anl.gov" }, ] description = "Controls for the TEAM1k detector" readme = "README.md" -requires-python = ">=3.9" +requires-python = ">=3.10" dependencies = [ "numpy >= 1.24.0", "pyserial >= 3.5", "p4p >= 4.1.0", - "pyzmq >= 25.0.0", + "pyyaml >= 6.0", + "pyvisa >= 1.13.0", + "pyvisa-py >= 0.7.0", + "dataclass-mage >= 0.25.1", ] [project.optional-dependencies] -hdf5 = ["h5py >= 3.0.0"] +client = ["tqdm >= 4.60.0"] [project.scripts] team1k-server = "team1k.server:main" diff --git a/src/team1k.egg-info/PKG-INFO b/src/team1k.egg-info/PKG-INFO deleted file mode 100644 index 87798c6..0000000 --- a/src/team1k.egg-info/PKG-INFO +++ /dev/null @@ -1,152 +0,0 @@ -Metadata-Version: 2.4 -Name: team1k -Version: 0.0.1 -Summary: Controls for the TEAM1k detector -Author-email: Sebastian Strempfer -Requires-Python: >=3.9 -Description-Content-Type: text/markdown -Requires-Dist: numpy>=1.24.0 -Requires-Dist: pyserial>=3.5 -Requires-Dist: pydantic>=2.0.0 -Requires-Dist: typing-extensions>=4.5.0 - -# Team1k: X-ray Detector Control System - -This package provides control software for the Team1k X-ray detector system, including: - -1. A server that manages: - - Camera bellow stage (linear motion) - - Power supply control - - Detector control via KTest commands - - Data streaming to EPICS PVA - -2. Client libraries for both synchronous and asynchronous operation - -## Installation - -```bash -pip install -e . -``` - -## Server Usage - -Start the Team1k server: - -```bash -team1k-server -``` - -The server exposes: -- ZMQ REP socket on port 42003 for commands -- ZMQ PUB socket on port 42004 for status updates -- ZMQ SUB socket on port 42005 for data from NetworkInterface.cxx -- EPICS PVA for data streaming (if p4p is installed) - -## Client Usage - -### Basic Example - -```python -from team1k import Client, ExposureModes, TriggerModes - -# Create a client -client = Client('localhost') - -# Control the bellow stage -client.insert_detector() -client.retract_detector() - -# Control the power supply -client.power_supply_on() -voltages = client.read_voltages() -client.power_supply_off() - -# Configure the detector -client.set_exposure_mode(ExposureModes.ROLLING_SHUTTER) -client.set_trigger_mode(TriggerModes.INTERNAL_TRIGGER) -client.set_integration_time(100.0) # ms - -# Start/stop acquisition -client.start_daq() -client.stop_daq() - -# Close the client -client.close() -``` - -### Async Client - -```python -import asyncio -from team1k import AsyncClient - -async def main(): - # Create an async client - client = AsyncClient('localhost') - - # Subscribe to status updates - def status_callback(status): - print(f"Status update: {status}") - - await client.subscribe_to_status(status_callback) - - # Get current status - status = await client.get_status() - print(f"Current status: {status}") - - # Insert detector - await client.insert_detector() - - # Close client - await client.close() - -asyncio.run(main()) -``` - -## Example Scripts - -The `examples` directory contains sample scripts: - -- `client_demo.py`: Demonstrates synchronous and asynchronous client usage -- `status_monitor.py`: Real-time monitoring of system status - -## Server Commands - -The server accepts the following commands: - -| Command | Description | -|---------|-------------| -| `insert` | Insert the detector into the beam | -| `retract` | Retract the detector from the beam | -| `power_on` | Turn on the detector power supply | -| `power_off` | Turn off the detector power supply | -| `read_voltages` | Read the voltage values from the power supply | -| `get_status` | Get the current status of all devices | -| `subscribe PORT` | Subscribe to status updates with a callback port | -| `unsubscribe` | Unsubscribe from status updates | -| `kill` | Shut down the server | - -Additionally, all commands from the KTestCommand enum are supported. - -## Architecture - -The system uses a distributed architecture: - -1. **Team1k Server**: - - Manages all devices - - Provides command interface - - Publishes status updates - - Streams data to EPICS PVA - -2. **NetworkInterface.cxx**: - - Communicates with the detector hardware - - Streams data to a ZMQ socket - -3. **Team1k Client**: - - Connects to the server - - Sends commands - - Receives status updates - -4. **EPICS PVA**: - - Provides standard interface for data acquisition systems - - Allows integration with other EPICS tools diff --git a/src/team1k.egg-info/SOURCES.txt b/src/team1k.egg-info/SOURCES.txt deleted file mode 100644 index 62a207e..0000000 --- a/src/team1k.egg-info/SOURCES.txt +++ /dev/null @@ -1,12 +0,0 @@ -README.md -pyproject.toml -src/team1k/Client.py -src/team1k/KTestClient.py -src/team1k/__init__.py -src/team1k/server.py -src/team1k.egg-info/PKG-INFO -src/team1k.egg-info/SOURCES.txt -src/team1k.egg-info/dependency_links.txt -src/team1k.egg-info/entry_points.txt -src/team1k.egg-info/requires.txt -src/team1k.egg-info/top_level.txt \ No newline at end of file diff --git a/src/team1k.egg-info/dependency_links.txt b/src/team1k.egg-info/dependency_links.txt deleted file mode 100644 index 8b13789..0000000 --- a/src/team1k.egg-info/dependency_links.txt +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/team1k.egg-info/entry_points.txt b/src/team1k.egg-info/entry_points.txt deleted file mode 100644 index 0c1f31c..0000000 --- a/src/team1k.egg-info/entry_points.txt +++ /dev/null @@ -1,2 +0,0 @@ -[console_scripts] -team1k-server = team1k.server:main diff --git a/src/team1k.egg-info/requires.txt b/src/team1k.egg-info/requires.txt deleted file mode 100644 index 19d5195..0000000 --- a/src/team1k.egg-info/requires.txt +++ /dev/null @@ -1,4 +0,0 @@ -numpy>=1.24.0 -pyserial>=3.5 -pydantic>=2.0.0 -typing-extensions>=4.5.0 diff --git a/src/team1k.egg-info/top_level.txt b/src/team1k.egg-info/top_level.txt deleted file mode 100644 index 33f5e67..0000000 --- a/src/team1k.egg-info/top_level.txt +++ /dev/null @@ -1 +0,0 @@ -team1k diff --git a/src/team1k/Client.py b/src/team1k/Client.py deleted file mode 100644 index f945da4..0000000 --- a/src/team1k/Client.py +++ /dev/null @@ -1,406 +0,0 @@ -#!/usr/bin/env python3 -""" -Team1k Client — PVA + ZMQ interface to the Team1k detector server. - -Commands and status use EPICS PV Access (p4p). -Bulk frame retrieval uses ZMQ for efficient transfer over the network. - -Typical usage:: - - client = Client() - client.set_exposure_mode(ExposureModes.GLOBAL_SHUTTER_CDS) - client.set_integration_time(6.0) - - # One-shot: start DAQ, grab 100 frames, stop DAQ - frames = client.acquire_frames(100) - print(frames.shape) # (100, 1024, 1024) - - # Or manual control - client.start_daq() - frames = client.get_frames(50) - client.stop_daq() - - client.close() -""" - -import enum -import json -import time -import logging -from typing import Any, Callable - -import numpy as np -import zmq - -from p4p.client.thread import Context, Subscription - -logger = logging.getLogger(__name__) - -DEFAULT_PV_PREFIX = "TEAM1K:" -DEFAULT_ZMQ_PORT = 42005 - - -class ExposureModes(enum.IntEnum): - """Detector exposure modes.""" - ROLLING_SHUTTER = 0 - GLOBAL_SHUTTER = 1 - GLOBAL_SHUTTER_FLUSH = 2 - GLOBAL_SHUTTER_CDS = 3 - - -class TriggerModes(enum.IntEnum): - """Detector trigger modes.""" - INTERNAL = 0 - EXTERNAL = 1 - - -class CommandError(Exception): - """Raised when a PVA command put or ZMQ transfer fails.""" - pass - - -class Client: - """ - Client for the Team1k detector server. - - Uses PVA for commands/status and ZMQ for bulk frame data transfer. - - Args: - prefix: PV name prefix (default: "TEAM1K:"). - data_host: Hostname/IP of the server's ZMQ data port. - data_port: ZMQ port for frame data transfer. - timeout: Default timeout in seconds for PVA operations. - """ - - def __init__(self, prefix: str = DEFAULT_PV_PREFIX, - data_host: str = "localhost", - data_port: int = DEFAULT_ZMQ_PORT, - timeout: float = 5.0): - self._prefix = prefix - self._data_host = data_host - self._data_port = data_port - self._timeout = timeout - self._ctx = Context("pva") - self._zmq_ctx = zmq.Context() - self._subscriptions: dict[str, Subscription] = {} - - def _pv(self, name: str) -> str: - """Build full PV name.""" - return f"{self._prefix}{name}" - - def _put(self, pv_suffix: str, value: Any) -> None: - """Put a value to a command PV.""" - pv_name = self._pv(pv_suffix) - try: - self._ctx.put(pv_name, value, timeout=self._timeout) - except Exception as e: - raise CommandError(f"Failed to put {pv_name}={value}: {e}") from e - - def _get(self, pv_suffix: str) -> Any: - """Get the current value of a PV.""" - pv_name = self._pv(pv_suffix) - try: - result = self._ctx.get(pv_name, timeout=self._timeout) - return result.value if hasattr(result, 'value') else result - except Exception as e: - raise CommandError(f"Failed to get {pv_name}: {e}") from e - - # --- Frame retrieval (the primary data interface) --- - - def get_frames(self, num_frames: int, timeout: float = 120.0) -> np.ndarray: - """ - Capture frames from the running DAQ and transfer them. - - DAQ must already be running. This triggers a server-side capture of - N consecutive frames, waits for completion, then fetches the data - over ZMQ. - - Args: - num_frames: Number of frames to capture. - timeout: Maximum time to wait for capture + transfer (seconds). - - Returns: - numpy array of shape (num_frames, ny, nx) with dtype uint16. - - Raises: - CommandError: If capture fails, times out, or transfer errors. - """ - # Trigger server-side capture - self._put("CMD:CAPTURE", num_frames) - - # Poll capture status until READY - deadline = time.monotonic() + timeout - while time.monotonic() < deadline: - status = self._get("CAPTURE:STATUS") - if status == "READY": - break - if status == "ERROR": - raise CommandError("Server-side capture failed") - time.sleep(0.1) - else: - raise CommandError( - f"Capture timed out after {timeout}s " - f"(status={self._get('CAPTURE:STATUS')})" - ) - - # Fetch data over ZMQ - remaining = deadline - time.monotonic() - return self._fetch_zmq(timeout=max(remaining, 5.0)) - - def acquire_frames(self, num_frames: int, timeout: float = 120.0) -> np.ndarray: - """ - Start DAQ, capture N frames, stop DAQ, and return the data. - - Convenience method for one-shot acquisitions. Equivalent to:: - - client.start_daq() - frames = client.get_frames(num_frames) - client.stop_daq() - - Args: - num_frames: Number of frames to capture. - timeout: Maximum time to wait (seconds). - - Returns: - numpy array of shape (num_frames, ny, nx) with dtype uint16. - """ - self.start_daq() - try: - return self.get_frames(num_frames, timeout=timeout) - finally: - self.stop_daq() - - def _fetch_zmq(self, timeout: float = 60.0) -> np.ndarray: - """Fetch captured frame data from the server via ZMQ.""" - sock = self._zmq_ctx.socket(zmq.REQ) - sock.setsockopt(zmq.RCVTIMEO, int(timeout * 1000)) - sock.setsockopt(zmq.LINGER, 0) - sock.connect(f"tcp://{self._data_host}:{self._data_port}") - - try: - sock.send(b"FETCH") - parts = sock.recv_multipart() - - if len(parts) != 2: - raise CommandError(f"Unexpected ZMQ response: {len(parts)} parts") - - header = json.loads(parts[0]) - if "error" in header: - raise CommandError(f"Server error: {header['error']}") - - data = parts[1] - n = header["num_frames"] - ny = header["ny"] - nx = header["nx"] - dtype = np.dtype(header.get("dtype", "uint16")) - - expected_bytes = n * ny * nx * dtype.itemsize - if len(data) != expected_bytes: - raise CommandError( - f"Data size mismatch: got {len(data)}, " - f"expected {expected_bytes}" - ) - - arr = np.frombuffer(data, dtype=dtype).copy() - return arr.reshape(n, ny, nx) - - except zmq.Again: - raise CommandError(f"ZMQ fetch timed out after {timeout}s") - finally: - sock.close() - - # --- Detector commands --- - - def set_exposure_mode(self, mode: ExposureModes | int) -> None: - """Set the detector exposure mode (0-3).""" - self._put("CMD:EXPOSURE_MODE", int(mode)) - - def set_trigger_mode(self, mode: TriggerModes | int) -> None: - """Set the trigger mode (0=internal, 1=external).""" - self._put("CMD:TRIGGER_MODE", int(mode)) - - def set_integration_time(self, time_ms: float) -> None: - """Set the integration time in milliseconds.""" - self._put("CMD:INTEGRATION_TIME", time_ms) - - def start_daq(self) -> None: - """Start data acquisition.""" - self._put("CMD:START_DAQ", 1) - - def stop_daq(self) -> None: - """Stop data acquisition.""" - self._put("CMD:START_DAQ", 0) - - def set_adc_clock_freq(self, freq_mhz: float) -> None: - """Set ADC clock frequency in MHz (50-100).""" - self._put("CMD:ADC_CLOCK_FREQ", freq_mhz) - - def set_adc_data_delay(self, delay: int) -> None: - """Set ADC data delay.""" - self._put("CMD:ADC_DATA_DELAY", delay) - - def load_parameter_file(self, file_path: str) -> None: - """Apply a parameter file on the server.""" - self._put("CMD:PARAMETER_FILE", file_path) - - def reset_connection(self) -> None: - """Reset the detector connection (stop DAQ, reinitialize).""" - self._put("CMD:RESET", 1) - - def set_test_mode(self, enable: bool) -> None: - """Enable or disable FPGA test data generation.""" - self._put("CMD:TEST_MODE", 1 if enable else 0) - - # --- Peripheral commands --- - - def insert_detector(self) -> None: - """Insert the detector (move bellow stage).""" - self._put("BELLOW:CMD", 1) - - def retract_detector(self) -> None: - """Retract the detector (move bellow stage).""" - self._put("BELLOW:CMD", 0) - - def power_on(self) -> None: - """Turn on the detector power supply.""" - self._put("POWER:CMD", 1) - - def power_off(self) -> None: - """Turn off the detector power supply.""" - self._put("POWER:CMD", 0) - - # --- Status getters --- - - def get_status(self) -> list[str]: - """Get server status strings.""" - val = self._get("STATUS") - return list(val) if not isinstance(val, str) else [val] - - def is_acquiring(self) -> bool: - """Check if DAQ is running.""" - return bool(self._get("ACQUIRING")) - - def get_frame_rate(self) -> float: - """Get current frame rate in Hz.""" - return float(self._get("FRAME_RATE")) - - def get_frame_count(self) -> int: - """Get total frames acquired.""" - return int(self._get("FRAME_COUNT")) - - def get_exposure_mode(self) -> int: - """Get current exposure mode.""" - return int(self._get("EXPOSURE_MODE")) - - def get_trigger_mode(self) -> int: - """Get current trigger mode.""" - return int(self._get("TRIGGER_MODE")) - - def get_integration_time(self) -> float: - """Get current integration time in ms.""" - return float(self._get("INTEGRATION_TIME")) - - def get_capture_status(self) -> str: - """Get capture status (IDLE, CAPTURING, READY, ERROR).""" - return str(self._get("CAPTURE:STATUS")) - - def get_capture_progress(self) -> tuple[int, int]: - """Get capture progress as (captured, total).""" - progress = int(self._get("CAPTURE:PROGRESS")) - total = int(self._get("CAPTURE:TOTAL")) - return progress, total - - def get_bellow_position(self) -> int: - """Get bellow stage position.""" - return int(self._get("BELLOW:POSITION")) - - def get_power_voltages(self) -> np.ndarray: - """Get power supply voltage readings.""" - return np.asarray(self._get("POWER:VOLTAGES")) - - # --- Monitors (live subscriptions) --- - - def monitor_image(self, callback: Callable[[np.ndarray], None]) -> None: - """ - Subscribe to live image updates. - - The callback receives a 2D numpy array for each new frame. - Only one image monitor can be active at a time. - - Args: - callback: Function called with numpy array for each frame. - """ - self.stop_monitor("IMAGE") - - pv_name = self._pv("IMAGE") - - def _on_image(value): - try: - arr = value.value - if hasattr(value, 'dimension') and value.dimension: - shape = tuple(d.size for d in value.dimension) - arr = arr.reshape(shape) - callback(arr) - except Exception as e: - logger.debug("Image monitor error: %s", e) - - self._subscriptions["IMAGE"] = self._ctx.monitor(pv_name, _on_image) - - def monitor_status(self, pv_name: str, - callback: Callable[[Any], None]) -> None: - """ - Subscribe to a status PV for live updates. - - Args: - pv_name: PV suffix (e.g., "FRAME_RATE", "ACQUIRING"). - callback: Function called with the new value on each update. - """ - self.stop_monitor(pv_name) - - full_name = self._pv(pv_name) - - def _on_update(value): - try: - val = value.value if hasattr(value, 'value') else value - callback(val) - except Exception as e: - logger.debug("Monitor %s error: %s", pv_name, e) - - self._subscriptions[pv_name] = self._ctx.monitor(full_name, _on_update) - - def stop_monitor(self, name: str) -> None: - """Stop a specific monitor subscription.""" - sub = self._subscriptions.pop(name, None) - if sub is not None: - sub.close() - - def stop_all_monitors(self) -> None: - """Stop all active monitor subscriptions.""" - for sub in self._subscriptions.values(): - sub.close() - self._subscriptions.clear() - - def close(self) -> None: - """Close the client and release all resources.""" - self.stop_all_monitors() - self._ctx.close() - self._zmq_ctx.term() - - -# Example usage -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - - client = Client(data_host="detector-machine") - try: - print("Status:", client.get_status()) - - # One-shot acquisition of 10 frames - client.set_exposure_mode(ExposureModes.GLOBAL_SHUTTER_CDS) - client.set_integration_time(6.0) - frames = client.acquire_frames(10) - print(f"Got {frames.shape[0]} frames, shape={frames.shape}") - except CommandError as e: - print(f"Error: {e}") - finally: - client.close() diff --git a/src/team1k/__init__.py b/src/team1k/__init__.py index 03f35f8..a2deffa 100644 --- a/src/team1k/__init__.py +++ b/src/team1k/__init__.py @@ -1,8 +1,7 @@ -from .Client import Client, ExposureModes, TriggerModes, CommandError +from .client import Client +from .state import DetectorState __all__ = [ 'Client', - 'ExposureModes', - 'TriggerModes', - 'CommandError', + 'DetectorState', ] diff --git a/src/team1k/capture.py b/src/team1k/capture.py new file mode 100644 index 0000000..6100079 --- /dev/null +++ b/src/team1k/capture.py @@ -0,0 +1,333 @@ +""" +Buffered frame capture with TCP streaming. + +Captures frames from the shared memory ring buffer to a temp file +(at detector speed), then streams them to the client over TCP +(at network speed). The temp file buffers the speed difference. + +This capture is completely independent of the PVA IMAGE stream — +it uses its own RingBufferReader. +""" + +import os +import time +import logging +import tempfile +import threading +from pathlib import Path + +import numpy as np + +from .detector.chip_config import ChipConfig, TEAM1K_CONFIG +from .tcp_protocol import send_msg, send_bytes + +logger = logging.getLogger(__name__) + + +class CaptureHandle: + """Handle for an in-progress open-ended (start/stop) capture.""" + + def __init__(self): + self.stop_event = threading.Event() + self.num_captured = 0 + self.error: str | None = None + + +class BufferedCapture: + """ + Captures frames from shmring with temp file buffering. + + Uses its own RingBufferReader, independent of the PVA streamer. + Only one capture can run at a time (enforced externally by tcp_server). + """ + + def __init__(self, ring_name: str, chip_config: ChipConfig = TEAM1K_CONFIG): + self._ring_name = ring_name + self._config = chip_config + self._frame_bytes = chip_config.image_nx * chip_config.image_ny * 2 # uint16 + + def capture_to_socket(self, sock, num_frames: int) -> int: + """ + Capture N frames, stream them to the client socket. + + Phase 1 (writer thread): shmring → temp file (fast, detector speed) + Phase 2 (this thread): temp file → TCP socket (network speed) + + Both phases run concurrently; the temp file acts as buffer. + + Returns: + Number of frames sent to client. + """ + from locking_shmring import RingBufferReader + + # Send header + send_msg(sock, { + "ok": True, + "num_frames": num_frames, + "nx": self._config.image_nx, + "ny": self._config.image_ny, + "dtype": "uint16", + "frame_bytes": self._frame_bytes, + }) + + fd, temp_path = tempfile.mkstemp(suffix=".raw", prefix="team1k_cap_") + os.close(fd) + + writer_done = threading.Event() + frames_written = [0] # mutable counter shared with writer thread + writer_error = [None] + + def _writer(): + """Read frames from shmring → temp file.""" + try: + reader = RingBufferReader(self._ring_name) + with open(temp_path, "wb") as f: + for slot in reader.iter_frames(timeout_ms=5000): + with slot: + frame = slot.data_as_numpy( + (self._config.image_ny, self._config.image_nx), + np.uint16, + ) + f.write(frame.tobytes()) + frames_written[0] += 1 + if frames_written[0] >= num_frames: + break + reader.close() + except Exception as e: + writer_error[0] = str(e) + logger.error("Capture writer error: %s", e) + finally: + writer_done.set() + + # Start writer thread + writer = threading.Thread(target=_writer, daemon=True, + name="team1k-cap-writer") + writer.start() + + # Stream from temp file → socket as data becomes available + frames_sent = 0 + try: + with open(temp_path, "rb") as f: + while frames_sent < num_frames: + # Wait for writer to produce more data + while frames_written[0] <= frames_sent: + if writer_done.is_set(): + break + time.sleep(0.001) + + if frames_written[0] <= frames_sent and writer_done.is_set(): + break + + # Read and send available frames + while frames_sent < frames_written[0] and frames_sent < num_frames: + frame_data = f.read(self._frame_bytes) + if len(frame_data) != self._frame_bytes: + break + send_bytes(sock, frame_data) + frames_sent += 1 + + except (ConnectionError, BrokenPipeError) as e: + logger.warning("Client disconnected during capture: %s", e) + finally: + writer_done.wait(timeout=10) + try: + Path(temp_path).unlink(missing_ok=True) + except Exception: + pass + + # Send completion + error = writer_error[0] + send_msg(sock, { + "done": True, + "num_frames": frames_sent, + "error": error, + }) + + if error: + logger.error("Capture completed with error: %s (sent %d/%d)", + error, frames_sent, num_frames) + else: + logger.info("Capture complete: sent %d/%d frames", + frames_sent, num_frames) + + return frames_sent + + def capture_duration_to_socket(self, sock, duration_sec: float) -> int: + """ + Capture frames for a specified duration, stream to client. + + Since frame count isn't known upfront, each frame is sent with + an 8-byte length prefix. A completion message signals the end. + + Returns: + Number of frames sent. + """ + from locking_shmring import RingBufferReader + + # Send header with unknown frame count + send_msg(sock, { + "ok": True, + "num_frames": -1, + "nx": self._config.image_nx, + "ny": self._config.image_ny, + "dtype": "uint16", + "frame_bytes": self._frame_bytes, + }) + + fd, temp_path = tempfile.mkstemp(suffix=".raw", prefix="team1k_dur_") + os.close(fd) + + writer_done = threading.Event() + frames_written = [0] + + def _writer(): + try: + reader = RingBufferReader(self._ring_name) + deadline = time.monotonic() + duration_sec + with open(temp_path, "wb") as f: + for slot in reader.iter_frames(timeout_ms=1000): + with slot: + frame = slot.data_as_numpy( + (self._config.image_ny, self._config.image_nx), + np.uint16, + ) + f.write(frame.tobytes()) + frames_written[0] += 1 + if time.monotonic() >= deadline: + break + reader.close() + except Exception as e: + logger.error("Duration capture writer error: %s", e) + finally: + writer_done.set() + + writer = threading.Thread(target=_writer, daemon=True, + name="team1k-dur-writer") + writer.start() + + frames_sent = 0 + try: + with open(temp_path, "rb") as f: + while True: + while frames_written[0] <= frames_sent: + if writer_done.is_set(): + break + time.sleep(0.001) + + if frames_written[0] <= frames_sent and writer_done.is_set(): + break + + while frames_sent < frames_written[0]: + frame_data = f.read(self._frame_bytes) + if len(frame_data) != self._frame_bytes: + break + send_bytes(sock, frame_data) + frames_sent += 1 + + except (ConnectionError, BrokenPipeError) as e: + logger.warning("Client disconnected during capture: %s", e) + finally: + writer_done.wait(timeout=max(duration_sec + 5, 10)) + try: + Path(temp_path).unlink(missing_ok=True) + except Exception: + pass + + send_msg(sock, {"done": True, "num_frames": frames_sent}) + logger.info("Duration capture complete: %d frames in %.1fs", + frames_sent, duration_sec) + return frames_sent + + def start_streaming_to_socket(self, sock) -> CaptureHandle: + """ + Start open-ended capture. Returns a handle; call handle.stop_event.set() + to stop. Frames are sent with length prefix since count is unknown. + + Returns: + CaptureHandle that the caller can use to stop the capture. + """ + handle = CaptureHandle() + + send_msg(sock, { + "ok": True, + "num_frames": -1, + "nx": self._config.image_nx, + "ny": self._config.image_ny, + "dtype": "uint16", + "frame_bytes": self._frame_bytes, + }) + + def _stream(): + from locking_shmring import RingBufferReader + + fd, temp_path = tempfile.mkstemp(suffix=".raw", + prefix="team1k_stream_") + os.close(fd) + + writer_done = threading.Event() + frames_written = [0] + + def _writer(): + try: + reader = RingBufferReader(self._ring_name) + with open(temp_path, "wb") as f: + for slot in reader.iter_frames(timeout_ms=1000): + with slot: + frame = slot.data_as_numpy( + (self._config.image_ny, + self._config.image_nx), + np.uint16, + ) + f.write(frame.tobytes()) + frames_written[0] += 1 + if handle.stop_event.is_set(): + break + reader.close() + except Exception as e: + handle.error = str(e) + logger.error("Streaming writer error: %s", e) + finally: + writer_done.set() + + writer = threading.Thread(target=_writer, daemon=True, + name="team1k-stream-writer") + writer.start() + + frames_sent = 0 + try: + with open(temp_path, "rb") as f: + while True: + while frames_written[0] <= frames_sent: + if writer_done.is_set(): + break + time.sleep(0.001) + + if frames_written[0] <= frames_sent and writer_done.is_set(): + break + + while frames_sent < frames_written[0]: + frame_data = f.read(self._frame_bytes) + if len(frame_data) != self._frame_bytes: + break + send_bytes(sock, frame_data) + frames_sent += 1 + + except (ConnectionError, BrokenPipeError) as e: + logger.warning("Client disconnected during streaming: %s", e) + finally: + handle.stop_event.set() + writer_done.wait(timeout=10) + try: + Path(temp_path).unlink(missing_ok=True) + except Exception: + pass + + handle.num_captured = frames_sent + send_msg(sock, {"done": True, "num_frames": frames_sent}) + logger.info("Streaming capture complete: %d frames", frames_sent) + + stream_thread = threading.Thread(target=_stream, daemon=True, + name="team1k-stream") + stream_thread.start() + + return handle diff --git a/src/team1k/client.py b/src/team1k/client.py new file mode 100644 index 0000000..8342326 --- /dev/null +++ b/src/team1k/client.py @@ -0,0 +1,460 @@ +""" +Team1k Client — Pure-TCP client for the Team1k detector server. + +No EPICS/p4p/zmq dependencies. Only requires numpy (and optionally tqdm +for progress bars in notebooks). + +Usage:: + + from team1k import Client + + client = Client("detector-host") + + # Configure + client.exposure_mode = 3 # GlobalCDS + client.trigger_mode = 1 # External + client.integration_time = 4.0 # ms + + # Capture frames + frames = client.capture(100) + print(frames.shape) # (100, 1024, 1024) + + # Or capture by duration + frames = client.capture(duration=5.0) + + # Open-ended capture + stream = client.start_capture() + # ... later ... + frames = stream.stop() + + client.close() +""" + +import io +import socket +import logging +from typing import Any + +import numpy as np +from .config import ExposureMode, TriggerMode, TriggerPolarity + +from .tcp_protocol import recv_bytes_or_msg, send_msg, recv_msg, recv_exact + +logger = logging.getLogger(__name__) + +# Default server port (must match server's config.client_port) +DEFAULT_PORT = 42010 + + +class CaptureStream: + """ + Handle for an open-ended streaming capture. + + Created by Client.start_capture(). Call stop() to end the capture + and retrieve all frames as a numpy array. + """ + + def __init__(self, sock: socket.socket, header: dict): + self._sock = sock + self.nx = header["nx"] + self.ny = header["ny"] + self.frame_bytes = header["frame_bytes"] + self._dtype = np.dtype(header.get("dtype", "uint16")) + self._frames: list[np.ndarray] = [] + self._stopped = False + + def stop(self, progress: bool = True) -> np.ndarray: + """ + Stop the capture and return all captured frames. + + Args: + progress: Show progress bar during final data reception. + + Returns: + numpy array of shape (N, ny, nx). + """ + if self._stopped: + if self._frames: + return np.stack(self._frames) + return np.empty((0, self.ny, self.nx), dtype=self._dtype) + + # Send stop command — the server checks for this + send_msg(self._sock, {"cmd": "stop_capture"}) + + # Receive remaining frames until done message + self._receive_frames(progress=progress) + self._stopped = True + + if self._frames: + return np.stack(self._frames) + return np.empty((0, self.ny, self.nx), dtype=self._dtype) + + def _receive_frames(self, progress: bool = True) -> None: + """Receive frames until a done message arrives.""" + pbar = None + if progress: + pbar = _make_pbar(desc="Receiving", unit="frame") + + while True: + # Frames come with 8-byte length prefix (unknown count) + data = recv_bytes_or_msg(self._sock) + + # Check if it's a JSON message (done signal) + if isinstance(data, dict): + if data.get("done"): + break + continue + + frame = np.frombuffer(data, dtype=self._dtype).reshape( + self.ny, self.nx, + ).copy() + self._frames.append(frame) + + if pbar is not None: + pbar.update(1) + + if pbar is not None: + pbar.close() + + +class Client: + """ + Client for the Team1k detector server. + + Uses a TCP connection for all control and data transfer. + No EPICS/p4p/zmq dependencies required. + + Args: + host: Server hostname or IP address. + port: TCP port (default: 42010). + timeout: Connection timeout in seconds. + """ + + def __init__(self, host: str = "localhost", port: int = DEFAULT_PORT, + timeout: float = 5.0): + self._sock = socket.create_connection((host, port), timeout=timeout) + self._sock.settimeout(timeout) + self._host = host + self._port = port + self._timeout = timeout + + # --- Properties --- + + @property + def exposure_mode(self) -> ExposureMode: + """Current exposure mode (0=Rolling, 1=Rolling with Pause, 2=Global Shutter, 3=Global Shutter CDS).""" + return ExposureMode(self._get("exposure_mode")) + + @exposure_mode.setter + def exposure_mode(self, value: ExposureMode | int) -> None: + self._set("exposure_mode", int(value)) + + @property + def trigger_mode(self) -> TriggerMode: + """Current trigger mode (0=Internal, 1=External).""" + return TriggerMode(self._get("trigger_mode")) + + @trigger_mode.setter + def trigger_mode(self, value: TriggerMode | int) -> None: + self._set("trigger_mode", int(value)) + + @property + def trigger_polarity(self) -> TriggerPolarity: + """Current trigger polarity (0=Rising, 1=Falling).""" + return TriggerPolarity(self._get("trigger_polarity")) + + @trigger_polarity.setter + def trigger_polarity(self, value: TriggerPolarity | int) -> None: + self._set("trigger_polarity", int(value)) + + @property + def integration_time(self) -> float: + """Integration time in milliseconds.""" + return self._get("integration_time") + + @integration_time.setter + def integration_time(self, value: float) -> None: + self._set("integration_time", value) + + @property + def frame_rate(self) -> float: + """Current frame rate in Hz (read-only).""" + return self._get("frame_rate") + + @property + def frame_count(self) -> int: + """Total frames acquired (read-only).""" + return self._get("frame_count") + + @property + def state(self) -> str: + """Detector state (Disconnected/Initializing/Idle/Acquiring/Error).""" + return self.status()["state"] + + # --- Capture --- + + def capture(self, num_frames: int | None = None, + duration: float | None = None, + progress: bool = True) -> np.ndarray: + """ + Capture frames from the detector. + + Specify either num_frames or duration (not both). The DAQ must be + running (it runs continuously by default). + + Args: + num_frames: Number of frames to capture. + duration: Duration in seconds to capture. + progress: Show tqdm progress bar (default True). + + Returns: + numpy array of shape (N, ny, nx), dtype uint16. + """ + if num_frames is not None and duration is not None: + raise ValueError("Specify num_frames or duration, not both") + if num_frames is None and duration is None: + raise ValueError("Specify num_frames or duration") + + if num_frames is not None: + send_msg(self._sock, {"cmd": "capture", "num_frames": num_frames}) + else: + send_msg(self._sock, {"cmd": "capture", "duration_sec": duration}) + + # Receive header + self._sock.settimeout(None) # no timeout during capture + try: + header = recv_msg(self._sock) + except Exception: + self._sock.settimeout(self._timeout) + raise + + if not header.get("ok"): + self._sock.settimeout(self._timeout) + raise RuntimeError(header.get("error", "Capture failed")) + + n = header["num_frames"] + nx = header["nx"] + ny = header["ny"] + frame_bytes = header["frame_bytes"] + dtype = np.dtype(header.get("dtype", "uint16")) + + try: + if n > 0: + # Known frame count: frames sent as raw bytes, no length prefix + return self._recv_fixed_frames( + n, nx, ny, frame_bytes, dtype, progress, + ) + else: + # Unknown frame count (duration): frames with length prefix + return self._recv_variable_frames( + nx, ny, dtype, progress, + ) + finally: + self._sock.settimeout(self._timeout) + + def _recv_fixed_frames(self, n: int, nx: int, ny: int, + frame_bytes: int, dtype: np.dtype, + progress: bool) -> np.ndarray: + """Receive a known number of frames (no length prefix per frame).""" + frames = np.empty((n, ny, nx), dtype=dtype) + + iterator = range(n) + pbar = None + if progress: + pbar = _make_pbar(total=n, desc="Capturing", unit="frame") + + for i in iterator: + raw = recv_bytes_or_msg(self._sock, frame_bytes) + if not isinstance(raw, bytes): + raise ValueError(f"Expected frame data but got message: {raw}") + frames[i] = np.frombuffer(raw, dtype=dtype).reshape(ny, nx) + if pbar is not None: + pbar.update(1) + + if pbar is not None: + pbar.close() + + # Receive completion message + done = recv_msg(self._sock) + if done.get("error"): + logger.warning("Capture completed with error: %s", done["error"]) + + return frames + + def _recv_variable_frames(self, nx: int, ny: int, dtype: np.dtype, + progress: bool) -> np.ndarray: + """Receive frames with length prefix until done message.""" + import json + + frame_list = [] + pbar = None + if progress: + pbar = _make_pbar(desc="Capturing", unit="frame") + + while True: + data = recv_bytes_or_msg(self._sock) + + # Check for done message (JSON) + if isinstance(data, dict): + if data.get("done"): + break + continue + + frame = np.frombuffer(data, dtype=dtype).reshape(ny, nx).copy() + frame_list.append(frame) + if pbar is not None: + pbar.update(1) + + if pbar is not None: + pbar.close() + + if frame_list: + return np.stack(frame_list) + return np.empty((0, ny, nx), dtype=dtype) + + def start_capture(self) -> CaptureStream: + """ + Start an open-ended streaming capture. + + Returns a CaptureStream handle. Call handle.stop() to end the + capture and get the frames as a numpy array. + + Example:: + + stream = client.start_capture() + time.sleep(5) + frames = stream.stop() + """ + send_msg(self._sock, {"cmd": "start_capture"}) + + self._sock.settimeout(None) + header = recv_msg(self._sock) + + if not header.get("ok"): + self._sock.settimeout(self._timeout) + raise RuntimeError(header.get("error", "Start capture failed")) + + return CaptureStream(self._sock, header) + + # --- Peripherals --- + + def insert_detector(self) -> None: + """Insert the detector (move bellow stage).""" + self._action("insert_detector") + + def retract_detector(self) -> None: + """Retract the detector (move bellow stage).""" + self._action("retract_detector") + + def power_on(self) -> None: + """Turn on the detector power supply.""" + self._action("power_on") + + def power_off(self) -> None: + """Turn off the detector power supply.""" + self._action("power_off") + + def tec_on(self) -> None: + """Turn on the TEC power supply.""" + self._action("tec_on") + + def tec_off(self) -> None: + """Turn off the TEC power supply.""" + self._action("tec_off") + + @property + def power_status(self) -> dict[int, dict[str, float]]: + """Detector power supply channel readings: {ch: {voltage, current}}.""" + send_msg(self._sock, {"cmd": "power_status"}) + resp = recv_msg(self._sock) + if not resp.get("ok"): + raise RuntimeError(resp.get("error", "Failed")) + return {int(k): v for k, v in resp["channels"].items()} + + @property + def tec_status(self) -> dict[int, dict[str, float]]: + """TEC power supply channel readings: {ch: {voltage, current}}.""" + send_msg(self._sock, {"cmd": "tec_status"}) + resp = recv_msg(self._sock) + if not resp.get("ok"): + raise RuntimeError(resp.get("error", "Failed")) + return {int(k): v for k, v in resp["channels"].items()} + + # --- Register access --- + + def read_register(self, address: int) -> int: + """Read a detector register directly.""" + send_msg(self._sock, {"cmd": "register_read", "address": address}) + resp = recv_msg(self._sock) + if not resp.get("ok"): + raise RuntimeError(resp.get("error", "Register read failed")) + return resp["value"] + + def write_register(self, address: int, value: int) -> None: + """Write a detector register directly.""" + send_msg(self._sock, { + "cmd": "register_write", "address": address, "value": value, + }) + resp = recv_msg(self._sock) + if not resp.get("ok"): + raise RuntimeError(resp.get("error", "Register write failed")) + + # --- Status / reconnect --- + + def status(self) -> dict: + """Get full server status as a dict.""" + send_msg(self._sock, {"cmd": "status"}) + return recv_msg(self._sock) + + def reconnect(self) -> None: + """Tell the server to reconnect to the detector (after power cycle).""" + send_msg(self._sock, {"cmd": "reconnect"}) + resp = recv_msg(self._sock) + if not resp.get("ok"): + raise RuntimeError(resp.get("error", "Reconnect failed")) + + # --- Internal --- + + def _get(self, key: str) -> Any: + send_msg(self._sock, {"cmd": "get", "key": key}) + resp = recv_msg(self._sock) + if not resp.get("ok"): + raise RuntimeError(resp.get("error", f"Failed to get {key}")) + return resp["value"] + + def _set(self, key: str, value: Any) -> None: + send_msg(self._sock, {"cmd": "set", "key": key, "value": value}) + resp = recv_msg(self._sock) + if not resp.get("ok"): + raise RuntimeError(resp.get("error", f"Failed to set {key}")) + + def _action(self, cmd: str) -> None: + send_msg(self._sock, {"cmd": cmd}) + resp = recv_msg(self._sock) + if not resp.get("ok"): + raise RuntimeError(resp.get("error", f"Action {cmd} failed")) + + def close(self) -> None: + """Close the connection.""" + try: + self._sock.close() + except Exception: + pass + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + def __repr__(self) -> str: + return f"Client({self._host!r}, port={self._port})" + + +def _make_pbar(total=None, desc="", unit="it"): + """Create a tqdm progress bar if available, else None.""" + try: + from tqdm.auto import tqdm + return tqdm(total=total, desc=desc, unit=unit) + except ImportError: + return None diff --git a/src/team1k/config.py b/src/team1k/config.py new file mode 100644 index 0000000..8d9b816 --- /dev/null +++ b/src/team1k/config.py @@ -0,0 +1,177 @@ +""" +YAML configuration loader. + +Replaces the old parameter file parser with a structured YAML config +for all detector, server, and peripheral settings. +""" +from __future__ import annotations +from collections import OrderedDict +from enum import IntEnum +from pydantic import Field, BaseModel, PlainSerializer, field_serializer +from pydantic_yaml import parse_yaml_raw_as, to_yaml_file, to_yaml_str +import logging +from pathlib import Path +from typing import Any, Annotated +import os + +logger = logging.getLogger(__name__) + +class ExposureMode(IntEnum): + ROLLING_SHUTTER = 0 + ROLLING_SHUTTER_WITH_PAUSE = 1 + GLOBAL_SHUTTER = 2 + GLOBAL_SHUTTER_CDS = 3 + +class TriggerMode(IntEnum): + INTERNAL = 0 + EXTERNAL = 1 + + # Support str and int values for convenience + @classmethod + def _missing_(cls, value: Any) -> TriggerMode: + if isinstance(value, str): + return cls[value.upper()] + raise ValueError(f"Invalid TriggerMode value: {value}") + +class TriggerPolarity(IntEnum): + RISING_EDGE = 0 + FALLING_EDGE = 1 + + # Support str values for convenience + @classmethod + def _missing_(cls, value: Any) -> TriggerPolarity: + if isinstance(value, str): + return cls[value.upper()] + raise ValueError(f"Invalid TriggerPolarity value: {value}") + +class BellowStageConfig(BaseModel): + """Configuration for the bellow stage controller.""" + port: str = "/dev/CameraBellowStage" + inserted_position_um: int = 0 + retracted_position_um: int = -2_000_000 + absolute_positioning: bool = False + baudrate: int = 9600 + speed: int = 100_000 + +class PowerSupplyChannelConfig(BaseModel): + voltage: float = 0.0 + current: float = 0.0 + ovp: float = 0.0 + channel: Annotated[int, Field(ge=1, le=4)] = 1 + +def ser_number(value: Any) -> int: + return value + +class PowerSupplyConfig(BaseModel): + """Configuration for a programmable power supply.""" + port: str = "" + voltage_step: float = 0.2 + #channels: list[PowerSupplyChannelConfig] = Field(default=[PowerSupplyChannelConfig(channel=i) for i in range(1, 5)]) + channels: dict[Annotated[int, PlainSerializer(ser_number, return_type=int)], PowerSupplyChannelConfig] = Field(default=dict([(i, PowerSupplyChannelConfig(channel=i)) for i in range(1, 5)])) + + # @field_serializer('channels') + # def serialize_channels(self, channels: list[PowerSupplyChannelConfig], _info) -> list[dict]: + # # Filter out channels with zero voltage and current for cleaner YAML output + # filtered_items = [item.model_dump() for item in channels if any([item.voltage, item.current, item.ovp])] + # return filtered_items + +class DetectorConfig(BaseModel): + """Configuration for the detector connection.""" + ip: str = "10.0.0.32" + register_port: int = 42000 + data_port: int = 41000 + auto_reconnect: bool = True + reconnect_interval: float = 5.0 + +class ADCConfig(BaseModel): + """Configuration for ADC settings.""" + clock_frequency_mhz: Annotated[float, Field(ge=50, le=100)] = 50.0 + """ADC clock frequency (50-100 MHz).""" + data_delay: int = 0x180 + order_7to0: int = 0x89ABCDEF + order_15to8: int = 0x01234567 + +class DigitalSignalConfig(BaseModel): + """Configuration for digital signal settings.""" + order_7to0: int = 0x24FFFFFF + order_15to8: int = 0xF5F310FF + polarity: int = 0x0014 + +class DefaultsConfig(BaseModel): + """Default values for various parameters.""" + exposure_mode: ExposureMode = ExposureMode.GLOBAL_SHUTTER_CDS + trigger_mode: TriggerMode = TriggerMode.EXTERNAL + trigger_polarity: TriggerPolarity = TriggerPolarity.RISING_EDGE + integration_time_ms: float = 4.0 + +class ServerConfig(BaseModel): + """Configuration for the server.""" + pv_prefix: str = "TEAM1K:" + client_port: int = 42010 + +class PeripheralConfig(BaseModel): + """Configuration for peripheral devices.""" + bellow_stage: BellowStageConfig = Field(default_factory=BellowStageConfig) + detector_power: PowerSupplyConfig = Field(default_factory=PowerSupplyConfig) + tec: PowerSupplyConfig = Field(default_factory=PowerSupplyConfig) + +class Team1kConfig(BaseModel): + """Complete server configuration loaded from YAML.""" + config_version: int = 1 + + detector: DetectorConfig = Field(default_factory=DetectorConfig) + """Detector connection settings.""" + + adc: ADCConfig = Field(default_factory=ADCConfig) + """ADC configuration parameters.""" + + digital: DigitalSignalConfig = Field(default_factory=DigitalSignalConfig) + """Digital signal configuration parameters.""" + + defaults: DefaultsConfig = Field(default_factory=DefaultsConfig) + """Default DAQ parameters.""" + + server: ServerConfig = Field(default_factory=ServerConfig) + """Server configuration.""" + + peripherals: PeripheralConfig = Field(default_factory=PeripheralConfig) + """Peripheral device configurations, including power supplies and bellow stage.""" + + +def load_config(path: str | Path | None = None) -> Team1kConfig: + """ + Load configuration from a YAML file. + + Missing keys fall back to defaults. If no path is given, + returns a config with all defaults. + + Args: + path: Path to YAML config file, or None for defaults. + + Returns: + Populated Team1kConfig. + """ + if path is not None and not Path(path).is_file(): + logger.info("Config file not found: %s, creating it using defaults", path) + config = Team1kConfig() + print(config) + print(config.model_dump(mode='python')) + to_yaml_file(path, config)#, add_comments=True) + + if path is None: + path = Path(os.getenv("TEAM1K_CONFIG", os.path.join(os.path.expanduser("~"), ".team1k_config.yaml"))) + logger.info("No config file specified, attempting %s", path) + + path = Path(path) + if not path.is_file(): + logger.warning("Config file not found: %s, using defaults", path) + return Team1kConfig() + + with open(path) as f: + config = parse_yaml_raw_as(Team1kConfig, f.read()) + + logger.info("Config loaded: detector=%s:%d, prefix=%s", + config.detector_ip, config.register_port, config.pv_prefix) + + logger.debug("Full config:\n%s", to_yaml_str(config)) + return config diff --git a/src/team1k/detector/__init__.py b/src/team1k/detector/__init__.py index d900217..ef1ffbb 100644 --- a/src/team1k/detector/__init__.py +++ b/src/team1k/detector/__init__.py @@ -4,4 +4,3 @@ from .data_port import DataPort from .chip_config import ChipConfig, TEAM1K_CONFIG, configure_chip from .commands import DetectorCommands from .adc import ADCController -from .parameter_file import apply_parameter_file diff --git a/src/team1k/detector/parameter_file.py b/src/team1k/detector/parameter_file.py deleted file mode 100644 index b9c1c19..0000000 --- a/src/team1k/detector/parameter_file.py +++ /dev/null @@ -1,179 +0,0 @@ -""" -Parameter file parser. - -Ports KTestFunctions::ReadParameterFile from KTestFunctions.cxx (lines 1078-1195). -""" - -import logging -from pathlib import Path -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from .registers import RegisterInterface - from .commands import DetectorCommands - from .adc import ADCController - -logger = logging.getLogger(__name__) - - -def _parse_int(s: str) -> int: - """Parse an integer from string, supporting both decimal and hex (0x prefix).""" - s = s.strip() - if 'x' in s or 'X' in s: - return int(s, 16) - return int(s) - - -def apply_parameter_file( - filepath: str, - commands: 'DetectorCommands', - adc: 'ADCController', - registers: 'RegisterInterface', - restart_daq_callback=None, -) -> tuple[bool, str]: - """ - Parse and apply all settings from a parameter file. - - The file format is one setting per line: - keyword value1 [value2 ...] - Lines starting with '--' are comments. - - Supported keywords: - ADC_clock_frequency_in_MHz - ADC_data_delay - chip_type (ignored — only team1k is supported) - exposure_mode - trigger_mode - integration_time - digital_signal_polarity - digital_signal_order_7downto0 - digital_signal_order_15downto8 - ADC_order_7downto0 - ADC_order_15downto8 - restart_daq - - Args: - filepath: Path to the parameter file. - commands: DetectorCommands instance. - adc: ADCController instance. - registers: RegisterInterface instance. - restart_daq_callback: Optional callable to restart DAQ (for 'restart_daq' keyword). - - Returns: - Tuple of (success, output_message). - """ - path = Path(filepath) - if not path.is_file(): - msg = f"Parameter file not found: {filepath}" - logger.error(msg) - return False, msg - - output = [f"Reading parameter file: {filepath}"] - logger.info("Reading parameter file: %s", filepath) - - with open(path, 'r') as f: - for line_num, raw_line in enumerate(f, start=1): - line = raw_line.strip() - if not line: - continue - if line.startswith('--'): - continue - - parts = line.split() - if not parts: - continue - - keyword = parts[0] - args = parts[1:] - - ok = False - msg = "" - - try: - if keyword == "ADC_clock_frequency_in_MHz": - freq = float(args[0]) - ok = adc.set_clock_freq(freq) - msg = f"Set ADC clock frequency to {freq} MHz" - - elif keyword == "ADC_data_delay": - delay = _parse_int(args[0]) - ok = commands.set_adc_data_delay(delay) - msg = f"Set ADC data delay to 0x{delay:x}" - - elif keyword == "chip_type": - if args[0] == "team1k": - ok = True - msg = "Chip type: team1k (confirmed)" - else: - msg = f"Unsupported chip type: {args[0]} (only team1k supported)" - ok = False - - elif keyword == "exposure_mode": - mode = _parse_int(args[0]) - ok = commands.set_exposure_mode(mode) - msg = f"Set exposure mode to {mode}" - - elif keyword == "trigger_mode": - external = bool(_parse_int(args[0])) - polarity = bool(_parse_int(args[1])) if len(args) > 1 else False - ok = commands.set_trigger_mode(external, polarity) - msg = f"Set trigger mode: external={external}, polarity={polarity}" - - elif keyword == "integration_time": - time_ms = float(args[0]) - ok = commands.set_integration_time(time_ms) - msg = f"Set integration time to {time_ms} ms" - - elif keyword == "digital_signal_polarity": - val = _parse_int(args[0]) - ok = registers.write_register(27, val) - msg = f"Set digital signal polarity to {args[0]}" - - elif keyword == "digital_signal_order_7downto0": - val = _parse_int(args[0]) - ok = registers.write_register(28, val) - msg = f"Set digital signals 7:0 order to {args[0]}" - - elif keyword == "digital_signal_order_15downto8": - val = _parse_int(args[0]) - ok = registers.write_register(29, val) - msg = f"Set digital signals 15:8 order to {args[0]}" - - elif keyword == "ADC_order_7downto0": - val = _parse_int(args[0]) - ok = registers.write_register(30, val) - msg = f"Set ADC 7:0 order to {args[0]}" - - elif keyword == "ADC_order_15downto8": - val = _parse_int(args[0]) - ok = registers.write_register(31, val) - msg = f"Set ADC 15:8 order to {args[0]}" - - elif keyword == "restart_daq": - if restart_daq_callback: - ok = restart_daq_callback() - else: - ok = True - msg = "Restarted DAQ" - - else: - msg = f"Unknown keyword: {keyword}" - ok = False - - except (IndexError, ValueError) as e: - msg = f"Error parsing line {line_num}: {line} ({e})" - ok = False - - if ok: - output.append(f" {msg}") - else: - error_msg = f" Line {line_num}: {msg or f'Error: {line}'}" - output.append(error_msg) - logger.error("Parameter file error at line %d: %s", line_num, msg or line) - result = "\n".join(output) - return False, result - - output.append(f"Done reading parameter file: {filepath}") - result = "\n".join(output) - logger.info("Parameter file applied successfully") - return True, result diff --git a/src/team1k/filewriter/__init__.py b/src/team1k/filewriter/__init__.py deleted file mode 100644 index 85422d5..0000000 --- a/src/team1k/filewriter/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .capture import FrameCapture, DataTransferServer diff --git a/src/team1k/filewriter/capture.py b/src/team1k/filewriter/capture.py deleted file mode 100644 index 4b11e6f..0000000 --- a/src/team1k/filewriter/capture.py +++ /dev/null @@ -1,264 +0,0 @@ -""" -On-demand frame capture and ZMQ data transfer. - -Instead of continuous local file writing, this module captures a requested -number of frames from the shared memory ring buffer into a temp file, then -serves them to remote clients via ZMQ. This is designed for the common case -where the server runs on a dedicated detector machine and clients need to -retrieve data over a (slower) network. - -Flow: - 1. Client triggers capture via PVA: pvput TEAM1K:CMD:CAPTURE - 2. FrameCapture thread reads N frames from shmring → temp file - 3. Client polls TEAM1K:CAPTURE:STATUS via PVA until "READY" - 4. Client fetches data via ZMQ: sends b"FETCH", receives [header, data] -""" - -import json -import logging -import tempfile -import threading -from pathlib import Path - -import numpy as np -import zmq - -from ..detector.chip_config import ChipConfig, TEAM1K_CONFIG - -logger = logging.getLogger(__name__) - -DEFAULT_ZMQ_PORT = 42005 - - -class FrameCapture(threading.Thread): - """ - Captures frames from shmring on demand. - - When triggered via capture(), reads N consecutive frames from the ring - buffer and writes them to a temp file. Status is tracked and exposed - for the PVA interface. - """ - - def __init__(self, ring_name: str, chip_config: ChipConfig = TEAM1K_CONFIG): - super().__init__(daemon=True, name="team1k-capture") - self._ring_name = ring_name - self._config = chip_config - self._shutdown = threading.Event() - - # Capture state - self._capture_event = threading.Event() - self._capture_done = threading.Event() - self._num_requested = 0 - self._num_captured = 0 - self._status = "IDLE" - self._error_msg = "" - self._temp_path: str | None = None - self._data_lock = threading.Lock() - - @property - def status(self) -> str: - return self._status - - @property - def num_captured(self) -> int: - return self._num_captured - - @property - def num_requested(self) -> int: - return self._num_requested - - @property - def error_message(self) -> str: - return self._error_msg - - def capture(self, num_frames: int) -> None: - """Request capture of N frames from the ring buffer.""" - self._num_requested = num_frames - self._num_captured = 0 - self._status = "CAPTURING" - self._error_msg = "" - self._capture_done.clear() - self._capture_event.set() - logger.info("Capture requested: %d frames", num_frames) - - def wait_complete(self, timeout: float | None = None) -> bool: - """Wait for current capture to complete. Returns True if complete.""" - return self._capture_done.wait(timeout) - - def get_data(self) -> tuple[dict, bytes] | None: - """ - Read captured data. Returns (header_dict, raw_bytes) or None. - - The caller should hold this briefly — a new capture will block - until this returns. - """ - with self._data_lock: - if self._status != "READY" or not self._temp_path: - return None - header = { - "num_frames": self._num_captured, - "nx": self._config.image_nx, - "ny": self._config.image_ny, - "dtype": "uint16", - } - try: - data = Path(self._temp_path).read_bytes() - except Exception as e: - logger.error("Failed to read capture file: %s", e) - return None - return header, data - - def run(self) -> None: - """Main thread loop: wait for capture requests.""" - from locking_shmring import RingBufferReader - - reader = RingBufferReader(self._ring_name) - logger.info("Frame capture connected to ring buffer '%s'", self._ring_name) - - try: - while not self._shutdown.is_set(): - if not self._capture_event.wait(0.2): - continue - self._capture_event.clear() - self._do_capture(reader) - except Exception as e: - logger.error("Frame capture thread error: %s", e) - finally: - reader.close() - - def _do_capture(self, reader) -> None: - """Capture frames to a temp file.""" - # Clean up previous temp file (under lock so concurrent reads finish) - with self._data_lock: - if self._temp_path: - try: - Path(self._temp_path).unlink(missing_ok=True) - except Exception: - pass - self._temp_path = None - - try: - fd, temp_path = tempfile.mkstemp(suffix='.raw', prefix='team1k_capture_') - with open(fd, 'wb') as f: - for slot in reader.iter_frames(timeout_ms=5000): - if self._shutdown.is_set(): - self._status = "ERROR" - self._error_msg = "Shutdown during capture" - self._capture_done.set() - return - - with slot: - frame = slot.data_as_numpy( - (self._config.image_ny, self._config.image_nx), - np.uint16, - ) - f.write(frame.tobytes()) - self._num_captured += 1 - - if self._num_captured >= self._num_requested: - break - - if self._num_captured < self._num_requested: - self._status = "ERROR" - self._error_msg = ( - f"Only captured {self._num_captured}/{self._num_requested} " - f"frames (ring timeout)" - ) - logger.error(self._error_msg) - else: - with self._data_lock: - self._temp_path = temp_path - self._status = "READY" - logger.info("Capture complete: %d frames (%s)", - self._num_captured, temp_path) - - except Exception as e: - self._status = "ERROR" - self._error_msg = str(e) - logger.error("Capture error: %s", e) - - self._capture_done.set() - - def stop(self) -> None: - """Stop the capture thread and clean up.""" - self._shutdown.set() - if self._temp_path: - try: - Path(self._temp_path).unlink(missing_ok=True) - except Exception: - pass - - -class DataTransferServer(threading.Thread): - """ - ZMQ server for transferring captured frame data to clients. - - Runs a REP socket. Supports two request types: - b"FETCH" — returns [header_json, frame_data_bytes] - b"STATUS" — returns JSON with capture status - """ - - def __init__(self, capture: FrameCapture, chip_config: ChipConfig = TEAM1K_CONFIG, - port: int = DEFAULT_ZMQ_PORT): - super().__init__(daemon=True, name="team1k-data-transfer") - self._capture = capture - self._config = chip_config - self._port = port - self._shutdown = threading.Event() - - def run(self) -> None: - ctx = zmq.Context() - sock = ctx.socket(zmq.REP) - sock.bind(f"tcp://*:{self._port}") - sock.setsockopt(zmq.RCVTIMEO, 200) - logger.info("Data transfer server listening on tcp://*:%d", self._port) - - try: - while not self._shutdown.is_set(): - try: - msg = sock.recv() - except zmq.Again: - continue - self._handle(sock, msg) - finally: - sock.close() - ctx.term() - - def _handle(self, sock: zmq.Socket, msg: bytes) -> None: - try: - request = msg.decode('utf-8').strip() - except UnicodeDecodeError: - sock.send(json.dumps({"error": "Invalid request"}).encode()) - return - - if request == "FETCH": - self._handle_fetch(sock) - elif request == "STATUS": - sock.send(json.dumps({ - "status": self._capture.status, - "num_captured": self._capture.num_captured, - "num_requested": self._capture.num_requested, - "error": self._capture.error_message, - }).encode()) - else: - sock.send(json.dumps({"error": f"Unknown request: {request}"}).encode()) - - def _handle_fetch(self, sock: zmq.Socket) -> None: - result = self._capture.get_data() - if result is None: - sock.send_multipart([ - json.dumps({ - "error": "No data available", - "status": self._capture.status, - }).encode(), - b"", - ]) - return - - header, data = result - sock.send_multipart([json.dumps(header).encode(), data]) - logger.info("Sent %d frames (%d bytes) to client", - header["num_frames"], len(data)) - - def stop(self) -> None: - self._shutdown.set() diff --git a/src/team1k/peripherals/__init__.py b/src/team1k/peripherals/__init__.py index 5aef3ee..2631565 100644 --- a/src/team1k/peripherals/__init__.py +++ b/src/team1k/peripherals/__init__.py @@ -1,2 +1,4 @@ from .bellow_stage import BellowStage -from .power_supply import PowerSupply +from .power_supply import DetectorPowerSupply +from .tec import TECController +from .power_base import PowerSupplyBase diff --git a/src/team1k/peripherals/bellow_stage.py b/src/team1k/peripherals/bellow_stage.py index 616483d..b213cbc 100644 --- a/src/team1k/peripherals/bellow_stage.py +++ b/src/team1k/peripherals/bellow_stage.py @@ -9,18 +9,15 @@ import logging import serial +from ..config import BellowStageConfig + logger = logging.getLogger(__name__) -DEFAULT_PORT = '/dev/CameraBellowStage' -DEFAULT_BAUDRATE = 9600 - - class BellowStage: """Controls the camera bellow stage motor via serial port.""" - def __init__(self, port: str = DEFAULT_PORT, baudrate: int = DEFAULT_BAUDRATE): - self._port = port - self._baudrate = baudrate + def __init__(self, config: BellowStageConfig): + self._config = config self.position: int = 0 self.is_moving: bool = False @@ -38,42 +35,40 @@ class BellowStage: logger.info("%s detector", action) self.is_moving = True + if insert: + target_position = self._config.inserted_position_um + else: + target_position = self._config.retracted_position_um + try: - with serial.Serial(self._port, self._baudrate, timeout=1) as ser: - time.sleep(0.5) - ser.write(b'\r') - time.sleep(0.5) - ser.write(b'\x03') # Ctrl-C to stop any ongoing movement - time.sleep(0.5) + with serial.Serial(self._config.port, self._config.baudrate, timeout=1) as ser: + ser.write(b'\r') # Clear any previous command + ser.write(b'\x03') # Ctrl-C to stop any ongoing movement, restart the stage - ser.write(b'rc=100\r') - time.sleep(0.5) - - if insert: - ser.write(b'vm=100000\r') - time.sleep(0.5) - ser.write(b'mr 2000000\r') - else: - ser.write(b'vm=100001\r') - time.sleep(0.5) - ser.write(b'mr -2000000\r') - - # Wait for movement - time.sleep(10) - - # Read position - ser.write(b'pr p\r') - time.sleep(0.5) - response_bytes = ser.read_all() - response = response_bytes.decode('utf-8', errors='ignore') if response_bytes else "" - - for line in response.splitlines(): - if line.strip().startswith('p='): - try: - self.position = int(line.strip()[2:]) - except ValueError: - pass + for _ in range(3): + ser.write(b'EM=3\r') # Set ECHO mode to only respond after command completion + if ser.readline().strip() == b'EM=3': break + else: + logger.error("Failed to set ECHO mode on bellows stage.") + return False, "Failed to set ECHO mode on bellows stage." + + ser.write(f'RC=100\r'.encode()) # Set the run current + ser.readline() # Wait for command finish + + ser.write(f'VM={self._config.speed}\r'.encode()) # Set max velocity + ser.readline() # Wait for command finish + + self.is_moving = True + if self._config.absolute_positioning: + ser.write(f'MA={target_position}\r'.encode()) # Move to absolute position + else: + ser.write(f'MR={target_position}\r'.encode()) # Move relative to current position + ser.readline() # Wait for command finish + + ser.write(b'pr p\r') # Print current position + ser.readline() # Read echo + self.position = int(ser.readline()) # Read position self.is_moving = False logger.info("Detector %s successfully (position=%d)", diff --git a/src/team1k/peripherals/power_base.py b/src/team1k/peripherals/power_base.py new file mode 100644 index 0000000..63b5519 --- /dev/null +++ b/src/team1k/peripherals/power_base.py @@ -0,0 +1,149 @@ +""" +Base class for VISA-controlled programmable power supplies. + +Used by both the detector power supply and TEC controller. +Based on Rohde & Schwarz HMP series SCPI commands. +""" + +import logging +import threading +import dataclasses + +import pyvisa + +logger = logging.getLogger(__name__) + + +class PowerSupplyBase: + """ + Programmable power supply controller via VISA (serial). + + Args: + port: Device port (e.g., "/dev/DetectorPowerSupply" or "COM3"). + settings: Channel settings: {ch_num: (voltage, current, ovp)}. + voltage_step: Voltage step size for ramping (default 0.2V). + name: Human-readable name for logging. + """ + + @dataclasses.dataclass + class ChannelStatus: + voltage: float + current: float + + def __init__(self, port: str, + settings: dict[int, tuple[float, float, float]], + voltage_step: float = 0.2, + name: str = "Power Supply"): + self._port = f"ASRL{port}::INSTR" + self._settings = settings + self._voltage_step = voltage_step + self.name = name + self._is_on = False + self._lock = threading.Lock() + + def _open_resource(self) -> pyvisa.Resource: + """Open the VISA resource.""" + rm = pyvisa.ResourceManager("@py") + return rm.open_resource(self._port) + + def initialize(self) -> None: + """ + Configure all channels with their voltage, current, and OVP settings. + + All outputs are disabled during configuration. + """ + with self._lock: + try: + inst = self._open_resource() + inst.write("OUTP:GEN 0 \n") + + for ch, (voltage, current, ovp) in self._settings.items(): + inst.write(f"INST:NSEL {ch} \n") + inst.write(f"OUTP:SEL 0 \n") + inst.write(f"APPL {voltage}, {current} \n") + inst.write(f"SOUR:VOLT:PROT {ovp} \n") + inst.write(f"SOUR:VOLT:STEP {self._voltage_step} \n") + + volt = float(inst.query("SOUR:VOLT? \n")) + curr = float(inst.query("SOUR:CURR? \n")) + vpro = float(inst.query("SOUR:VOLT:PROT? \n")) + logger.debug("%s Ch%d: %.4fV, %.4fA, %.4fV-OVP", + self.name, ch, volt, curr, vpro) + + inst.close() + logger.info("%s initialized", self.name) + except Exception as e: + logger.error("%s initialization failed: %s", self.name, e) + raise + + def power_on(self) -> None: + """Turn on all configured channels.""" + with self._lock: + try: + if not self._is_on: + self.initialize() + + inst = self._open_resource() + + for ch, (voltage, _, _) in self._settings.items(): + output_on = voltage > 0 + inst.write(f"INST:NSEL {ch} \n") + inst.write(f"OUTP:SEL {1 if output_on else 0} \n") + + inst.write("OUTP:GEN 1 \n") + inst.close() + + self._is_on = True + logger.info("%s turned ON", self.name) + except Exception as e: + logger.error("%s power on failed: %s", self.name, e) + raise + + def power_off(self) -> None: + """Turn off all channels.""" + with self._lock: + try: + inst = self._open_resource() + inst.write("OUTP:GEN 0 \n") + + for ch in self._settings: + inst.write(f"INST:NSEL {ch} \n") + inst.write(f"OUTP:SEL 0 \n") + + inst.close() + + self._is_on = False + logger.info("%s turned OFF", self.name) + except Exception as e: + logger.error("%s power off failed: %s", self.name, e) + raise + + def get_status(self) -> dict[int, ChannelStatus]: + """ + Read measured voltage and current for each active channel. + + Returns: + {channel: {"voltage": float, "current": float}} + """ + with self._lock: + status = {} + try: + inst = self._open_resource() + + for ch, (voltage, _, _) in self._settings.items(): + if voltage == 0: + continue + inst.write(f"INST:NSEL {ch} \n") + volt = float(inst.query("MEAS:VOLT? \n")) + curr = float(inst.query("MEAS:CURR? \n")) + status[ch] = self.ChannelStatus(voltage=volt, current=curr) + + inst.close() + except Exception as e: + logger.debug("%s status read failed: %s", self.name, e) + + return status + + @property + def is_on(self) -> bool: + return self._is_on diff --git a/src/team1k/peripherals/power_supply.py b/src/team1k/peripherals/power_supply.py index 7351427..fb4cbef 100644 --- a/src/team1k/peripherals/power_supply.py +++ b/src/team1k/peripherals/power_supply.py @@ -1,37 +1,59 @@ """ -Power supply control. +Detector power supply controller. -Extracted from the existing server.py. +Wraps PowerSupplyBase with the detector-specific VISA port and +channel settings from the YAML config. """ import logging +from .power_base import PowerSupplyBase +from ..config import PowerSupplyConfig + logger = logging.getLogger(__name__) -class PowerSupply: - """Controls the detector power supply.""" +class DetectorPowerSupply(PowerSupplyBase): + """ + Detector power supply. - def __init__(self): - self.is_on: bool = False - self.voltages: list[float] = [] + Instantiated from the config's ``detector_power`` section. + If no port is configured, all operations are no-ops. + """ - def turn_on(self) -> tuple[bool, str | None]: - """Turn on the power supply.""" - # TODO: Implement actual power supply control - # The original server.py had placeholder serial port communication - self.is_on = True - logger.info("Power supply turned on") - return True, None + def __init__(self, config: PowerSupplyConfig): + self._enabled = bool(config.port) + if self._enabled: + super().__init__( + port=config.port, + settings=config.settings, + voltage_step=config.voltage_step, + name="Detector Power Supply", + ) + else: + self._is_on = False + self._lock = None + logger.info("Detector power supply not configured (no port)") - def turn_off(self) -> tuple[bool, str | None]: - """Turn off the power supply.""" - self.is_on = False - logger.info("Power supply turned off") - return True, None + def initialize(self) -> None: + if self._enabled: + super().initialize() - def read_voltages(self) -> tuple[list[float] | None, str | None]: - """Read voltage values from the power supply.""" - # TODO: Implement actual voltage reading - logger.info("Reading power supply voltages") - return self.voltages, None + def power_on(self) -> None: + if self._enabled: + super().power_on() + else: + self._is_on = True + logger.info("Detector power supply: power_on (no-op, not configured)") + + def power_off(self) -> None: + if self._enabled: + super().power_off() + else: + self._is_on = False + logger.info("Detector power supply: power_off (no-op, not configured)") + + def get_status(self) -> dict[int, PowerSupplyBase.ChannelStatus]: + if self._enabled: + return super().get_status() + return {} diff --git a/src/team1k/peripherals/tec.py b/src/team1k/peripherals/tec.py new file mode 100644 index 0000000..30e2e09 --- /dev/null +++ b/src/team1k/peripherals/tec.py @@ -0,0 +1,59 @@ +""" +TEC (thermoelectric cooler) power supply controller. + +Wraps PowerSupplyBase with the TEC-specific VISA port and +channel settings from the YAML config. +""" + +import logging + +from .power_base import PowerSupplyBase +from ..config import PowerSupplyConfig + +logger = logging.getLogger(__name__) + + +class TECController(PowerSupplyBase): + """ + TEC power supply. + + Instantiated from the config's ``tec`` section. + If no port is configured, all operations are no-ops. + """ + + def __init__(self, config: PowerSupplyConfig): + self._enabled = bool(config.port) + if self._enabled: + super().__init__( + port=config.port, + settings=config.settings, + voltage_step=config.voltage_step, + name="TEC Power Supply", + ) + else: + self._is_on = False + self._lock = None + logger.info("TEC power supply not configured (no port)") + + def initialize(self) -> None: + if self._enabled: + super().initialize() + + def power_on(self) -> None: + if self._enabled: + super().power_on() + else: + self._is_on = True + logger.info("TEC: power_on (no-op, not configured)") + + def power_off(self) -> None: + if self._enabled: + super().power_off() + else: + self._is_on = False + logger.info("TEC: power_off (no-op, not configured)") + + def get_status(self) -> dict[int, dict[str, float]]: + if self._enabled: + return super().get_status() + return {} diff --git a/src/team1k/pva/interface.py b/src/team1k/pva/interface.py index 0ce500e..460a5ff 100644 --- a/src/team1k/pva/interface.py +++ b/src/team1k/pva/interface.py @@ -1,8 +1,9 @@ """ -PVA server setup, command PVs, and status PVs. +PVA server with areaDetector-style PV naming. -Replaces all ZMQ communication (REQ/REP for commands, PUB for status) -with EPICS PV Access channels. +Uses RW PVs with _RBV readbacks following EPICS areaDetector conventions. +NTEnum for exposure mode, trigger mode, and detector state. +NTScalar bool for toggles (Acquire, Power, TEC, Bellow). """ import logging @@ -11,10 +12,14 @@ from typing import TYPE_CHECKING, Any import numpy as np -from p4p import Value from p4p.server import Server, ServerOperation from p4p.server.thread import SharedPV -from p4p.nt import NTScalar, NTNDArray +from p4p.nt import NTScalar, NTNDArray, NTEnum + +from ..state import ( + DetectorState, DETECTOR_STATE_CHOICES, + EXPOSURE_MODE_CHOICES, TRIGGER_MODE_CHOICES, +) if TYPE_CHECKING: from ..server import Team1kServer @@ -23,76 +28,84 @@ logger = logging.getLogger(__name__) class _ReadOnlyHandler: - """Handler for read-only status PVs.""" + """Handler that rejects writes for read-only PVs.""" def put(self, pv: SharedPV, op: ServerOperation): op.done(error="PV is read-only") - def rpc(self, pv: SharedPV, op: ServerOperation): - op.done(error="RPC not supported") - -class _CommandHandler: +class _RWHandler: """ - Handler for writable command PVs. + Handler for writable PVs. - When a client puts a value, the handler dispatches to the server. + On put, dispatches to server.apply_parameter(), then updates + both the setpoint PV and the corresponding _RBV PV. """ - def __init__(self, server: 'Team1kServer', command_name: str): + def __init__(self, server: 'Team1kServer', param_name: str, + rbv_pv: SharedPV | None = None): self._server = server - self._command = command_name + self._param = param_name + self._rbv = rbv_pv def put(self, pv: SharedPV, op: ServerOperation): try: raw = op.value() - # Extract the actual value from the NTScalar wrapper value = raw.value if hasattr(raw, 'value') else raw - result = self._server.execute_command(self._command, value) + # For NTEnum, extract the index + if hasattr(raw, 'index'): + value = raw.index + self._server.apply_parameter(self._param, value) pv.post(raw) + if self._rbv is not None: + self._rbv.post(raw) op.done() except Exception as e: - logger.error("Command '%s' failed: %s", self._command, e) + logger.error("Parameter '%s' put failed: %s", self._param, e) + op.done(error=str(e)) + + +class _ActionHandler: + """Handler for action PVs (bellow insert/retract, etc.).""" + + def __init__(self, server: 'Team1kServer', action_name: str): + self._server = server + self._action = action_name + + def put(self, pv: SharedPV, op: ServerOperation): + try: + raw = op.value() + value = raw.value if hasattr(raw, 'value') else raw + if value: + self._server.execute_action(self._action) + op.done() + except Exception as e: + logger.error("Action '%s' failed: %s", self._action, e) op.done(error=str(e)) class PVAInterface: """ - EPICS PV Access server for commands, status, and data streaming. + EPICS PV Access server following areaDetector conventions. - PV layout: - Status (read-only): - {prefix}STATUS - string array of status lines - {prefix}ACQUIRING - bool, DAQ running - {prefix}FRAME_RATE - float, current frame rate Hz - {prefix}FRAME_COUNT - int, total frames acquired - {prefix}EXPOSURE_MODE - int, current exposure mode - {prefix}TRIGGER_MODE - int, current trigger mode - {prefix}INTEGRATION_TIME - float, integration time ms - {prefix}CAPTURE:STATUS - string, capture state (IDLE/CAPTURING/READY/ERROR) - {prefix}CAPTURE:PROGRESS - int, frames captured so far - {prefix}CAPTURE:TOTAL - int, frames requested - - Commands (writable): - {prefix}CMD:EXPOSURE_MODE - int, set exposure mode - {prefix}CMD:TRIGGER_MODE - int, set trigger mode - {prefix}CMD:INTEGRATION_TIME - float, set integration time ms - {prefix}CMD:START_DAQ - int, 1=start, 0=stop - {prefix}CMD:CAPTURE - int, capture N frames - {prefix}CMD:ADC_CLOCK_FREQ - float, set ADC clock MHz - {prefix}CMD:ADC_DATA_DELAY - int, set ADC data delay - {prefix}CMD:PARAMETER_FILE - string, apply parameter file - {prefix}CMD:RESET - int, 1=reset detector connection - {prefix}CMD:TEST_MODE - int, 0/1 - - Data: - {prefix}IMAGE - NTNDArray, live image stream - - Peripherals: - {prefix}BELLOW:CMD - int, 1=insert, 0=retract - {prefix}BELLOW:POSITION - int, current position - {prefix}POWER:CMD - int, 1=on, 0=off - {prefix}POWER:VOLTAGES - float array, voltage readings + PV naming uses RW + _RBV pattern: + {prefix}Acquire / Acquire_RBV bool Start/stop DAQ + {prefix}AcquireTime / AcquireTime_RBV float Integration time (s) + {prefix}ExposureMode / ExposureMode_RBV NTEnum Exposure mode + {prefix}TriggerMode / TriggerMode_RBV NTEnum Trigger mode + {prefix}ArrayCounter_RBV int Frame count + {prefix}FrameRate_RBV float Hz + {prefix}DetectorState_RBV NTEnum State + {prefix}MaxSizeX_RBV / MaxSizeY_RBV int 1024 + {prefix}IMAGE NTNDArray + {prefix}Bellow:Insert / Bellow:Retract bool Actions + {prefix}Bellow:Position_RBV int + {prefix}Power:Enable / Power:Enable_RBV bool + {prefix}Power:ChN:Voltage_RBV float + {prefix}Power:ChN:Current_RBV float + {prefix}TEC:Enable / TEC:Enable_RBV bool + {prefix}TEC:ChN:Voltage_RBV float + {prefix}TEC:ChN:Current_RBV float """ def __init__(self, server: 'Team1kServer', prefix: str = "TEAM1K:"): @@ -100,109 +113,143 @@ class PVAInterface: self._prefix = prefix self._pvs: dict[str, SharedPV] = {} self._pva_server: Server | None = None - self._ro_handler = _ReadOnlyHandler() + self._ro = _ReadOnlyHandler() @property def image_pv(self) -> SharedPV: return self._pvs[f"{self._prefix}IMAGE"] - def _make_status_pv(self, nt, initial) -> SharedPV: - """Create a read-only status PV.""" - pv = SharedPV(handler=self._ro_handler, nt=nt, initial=initial) + def _pv_name(self, suffix: str) -> str: + return f"{self._prefix}{suffix}" + + def _add_ro(self, suffix: str, nt, initial) -> SharedPV: + """Create and register a read-only PV.""" + pv = SharedPV(handler=self._ro, nt=nt, initial=initial) + self._pvs[self._pv_name(suffix)] = pv return pv - def _make_command_pv(self, nt, initial, command_name: str) -> SharedPV: - """Create a writable command PV.""" - handler = _CommandHandler(self._server, command_name) + def _add_rw(self, suffix: str, nt, initial, + param_name: str, rbv: SharedPV | None = None) -> SharedPV: + """Create and register an RW PV with a parameter handler.""" + handler = _RWHandler(self._server, param_name, rbv) pv = SharedPV(handler=handler, nt=nt, initial=initial) + self._pvs[self._pv_name(suffix)] = pv + return pv + + def _add_action(self, suffix: str, action_name: str) -> SharedPV: + """Create and register an action PV (bool, triggers on True).""" + handler = _ActionHandler(self._server, action_name) + pv = SharedPV(handler=handler, nt=NTScalar('?'), initial=False) + self._pvs[self._pv_name(suffix)] = pv return pv def setup(self) -> None: """Create all PVs and start the PVA server.""" - p = self._prefix + chip = self._server.chip_config - # --- Status PVs (read-only) --- - self._pvs[f"{p}STATUS"] = self._make_status_pv( - NTScalar('as'), ['INITIALIZING']) - self._pvs[f"{p}ACQUIRING"] = self._make_status_pv( - NTScalar('?'), False) - self._pvs[f"{p}FRAME_RATE"] = self._make_status_pv( - NTScalar('d'), 0.0) - self._pvs[f"{p}FRAME_COUNT"] = self._make_status_pv( - NTScalar('l'), 0) - self._pvs[f"{p}EXPOSURE_MODE"] = self._make_status_pv( - NTScalar('i'), 0) - self._pvs[f"{p}TRIGGER_MODE"] = self._make_status_pv( - NTScalar('i'), 0) - self._pvs[f"{p}INTEGRATION_TIME"] = self._make_status_pv( - NTScalar('d'), 0.0) - self._pvs[f"{p}CAPTURE:STATUS"] = self._make_status_pv( - NTScalar('s'), "IDLE") - self._pvs[f"{p}CAPTURE:PROGRESS"] = self._make_status_pv( - NTScalar('i'), 0) - self._pvs[f"{p}CAPTURE:TOTAL"] = self._make_status_pv( - NTScalar('i'), 0) + # --- Acquisition control --- - # --- Command PVs (writable) --- - self._pvs[f"{p}CMD:EXPOSURE_MODE"] = self._make_command_pv( - NTScalar('i'), 0, "set_exposure_mode") - self._pvs[f"{p}CMD:TRIGGER_MODE"] = self._make_command_pv( - NTScalar('i'), 0, "set_trigger_mode") - self._pvs[f"{p}CMD:INTEGRATION_TIME"] = self._make_command_pv( - NTScalar('d'), 0.0, "set_integration_time") - self._pvs[f"{p}CMD:START_DAQ"] = self._make_command_pv( - NTScalar('i'), 0, "start_stop_daq") - self._pvs[f"{p}CMD:CAPTURE"] = self._make_command_pv( - NTScalar('i'), 0, "capture_frames") - self._pvs[f"{p}CMD:ADC_CLOCK_FREQ"] = self._make_command_pv( - NTScalar('d'), 60.0, "set_adc_clock_freq") - self._pvs[f"{p}CMD:ADC_DATA_DELAY"] = self._make_command_pv( - NTScalar('i'), 0, "set_adc_data_delay") - self._pvs[f"{p}CMD:PARAMETER_FILE"] = self._make_command_pv( - NTScalar('s'), "", "apply_parameter_file") - self._pvs[f"{p}CMD:RESET"] = self._make_command_pv( - NTScalar('i'), 0, "reset_connection") - self._pvs[f"{p}CMD:TEST_MODE"] = self._make_command_pv( - NTScalar('i'), 0, "set_test_mode") + # Acquire (bool) + acquire_rbv = self._add_ro("Acquire_RBV", NTScalar('?'), False) + self._add_rw("Acquire", NTScalar('?'), False, "acquire", acquire_rbv) - # --- Data PV --- - self._pvs[f"{p}IMAGE"] = SharedPV( - handler=self._ro_handler, + # AcquireTime (float, seconds — converted to ms internally) + acq_time_rbv = self._add_ro("AcquireTime_RBV", NTScalar('d'), 0.0) + self._add_rw("AcquireTime", NTScalar('d'), 0.0, + "acquire_time", acq_time_rbv) + + # ExposureMode (NTEnum) + em_initial = {"choices": EXPOSURE_MODE_CHOICES, "index": 0} + em_rbv = self._add_ro("ExposureMode_RBV", NTEnum(), em_initial) + self._add_rw("ExposureMode", NTEnum(), em_initial, + "exposure_mode", em_rbv) + + # TriggerMode (NTEnum) + tm_initial = {"choices": TRIGGER_MODE_CHOICES, "index": 0} + tm_rbv = self._add_ro("TriggerMode_RBV", NTEnum(), tm_initial) + self._add_rw("TriggerMode", NTEnum(), tm_initial, + "trigger_mode", tm_rbv) + + # --- Status (read-only) --- + + self._add_ro("ArrayCounter_RBV", NTScalar('l'), 0) + self._add_ro("FrameRate_RBV", NTScalar('d'), 0.0) + + # DetectorState (NTEnum, read-only) + state_initial = { + "choices": DETECTOR_STATE_CHOICES, + "index": DetectorState.DISCONNECTED, + } + self._add_ro("DetectorState_RBV", NTEnum(), state_initial) + + # Image dimensions + self._add_ro("MaxSizeX_RBV", NTScalar('i'), chip.image_nx) + self._add_ro("MaxSizeY_RBV", NTScalar('i'), chip.image_ny) + + # --- IMAGE (NTNDArray) --- + self._pvs[self._pv_name("IMAGE")] = SharedPV( + handler=self._ro, nt=NTNDArray(), - initial=np.zeros((self._server.chip_config.image_ny, - self._server.chip_config.image_nx), dtype=np.uint16), + initial=np.zeros((chip.image_ny, chip.image_nx), dtype=np.uint16), ) - # --- Peripheral PVs --- - self._pvs[f"{p}BELLOW:CMD"] = self._make_command_pv( - NTScalar('i'), 0, "bellow_move") - self._pvs[f"{p}BELLOW:POSITION"] = self._make_status_pv( - NTScalar('i'), 0) - self._pvs[f"{p}POWER:CMD"] = self._make_command_pv( - NTScalar('i'), 0, "power_control") - self._pvs[f"{p}POWER:VOLTAGES"] = self._make_status_pv( - NTScalar('ad'), np.zeros(0, dtype=np.float64)) + # --- Bellow --- + self._add_action("Bellow:Insert", "bellow_insert") + self._add_action("Bellow:Retract", "bellow_retract") + self._add_ro("Bellow:Position_RBV", NTScalar('i'), 0) - # Start the PVA server + # --- Detector power --- + power_rbv = self._add_ro("Power:Enable_RBV", NTScalar('?'), False) + self._add_rw("Power:Enable", NTScalar('?'), False, + "power_enable", power_rbv) + + # Per-channel voltage/current (created dynamically from config) + for ch in self._server.config.detector_power.channels: + self._add_ro(f"Power:Ch{ch}:Voltage_RBV", NTScalar('d'), 0.0) + self._add_ro(f"Power:Ch{ch}:Current_RBV", NTScalar('d'), 0.0) + + # --- TEC --- + tec_rbv = self._add_ro("TEC:Enable_RBV", NTScalar('?'), False) + self._add_rw("TEC:Enable", NTScalar('?'), False, + "tec_enable", tec_rbv) + + for ch in self._server.config.tec.channels: + self._add_ro(f"TEC:Ch{ch}:Voltage_RBV", NTScalar('d'), 0.0) + self._add_ro(f"TEC:Ch{ch}:Current_RBV", NTScalar('d'), 0.0) + + # Start PVA server self._pva_server = Server(providers=[self._pvs]) logger.info("PVA server started with %d PVs (prefix=%s)", len(self._pvs), self._prefix) - def post_status(self, name: str, value: Any) -> None: - """ - Update a status PV. - - Args: - name: PV name suffix (without prefix), e.g., 'FRAME_RATE'. - value: New value to post. - """ - full_name = f"{self._prefix}{name}" - pv = self._pvs.get(full_name) - if pv: + def post(self, suffix: str, value: Any) -> None: + """Update a PV value by suffix (e.g., 'FrameRate_RBV').""" + pv = self._pvs.get(self._pv_name(suffix)) + if pv is not None: try: pv.post(value) except Exception as e: - logger.debug("Could not post to %s: %s", full_name, e) + logger.debug("Could not post to %s: %s", suffix, e) + + def post_enum(self, suffix: str, index: int, choices: list[str]) -> None: + """Update an NTEnum PV.""" + self.post(suffix, {"choices": choices, "index": index}) + + def post_state(self, state: DetectorState) -> None: + """Update the DetectorState_RBV PV.""" + self.post_enum("DetectorState_RBV", state.value, DETECTOR_STATE_CHOICES) + + def post_exposure_mode(self, mode: int) -> None: + """Update ExposureMode RBVs.""" + val = {"choices": EXPOSURE_MODE_CHOICES, "index": mode} + self.post("ExposureMode_RBV", val) + self.post("ExposureMode", val) + + def post_trigger_mode(self, mode: int) -> None: + """Update TriggerMode RBVs.""" + val = {"choices": TRIGGER_MODE_CHOICES, "index": mode} + self.post("TriggerMode_RBV", val) + self.post("TriggerMode", val) def stop(self) -> None: """Stop the PVA server.""" diff --git a/src/team1k/server.py b/src/team1k/server.py index 9a22d8b..8cd9726 100644 --- a/src/team1k/server.py +++ b/src/team1k/server.py @@ -2,18 +2,19 @@ """ Team1k Server — Controls the Team1k X-ray detector system. -Communicates directly with the detector via UDP (replacing k_test C++ program). -Uses a dedicated subprocess for high-throughput data acquisition, shared memory -ring buffer for data transfer, and EPICS PV Access for all client communication. +Communicates directly with the detector via UDP. Uses a dedicated subprocess +for high-throughput data acquisition, shared memory ring buffer for data +transfer, and EPICS PV Access for the image stream and basic controls. +Python clients connect via TCP for full control and frame capture. Architecture: Main process: - Detector register read/write (UDP port 42000) - - PVA server (commands, status, data) + - PVA server (areaDetector-style PVs + IMAGE stream) - PVA streamer thread (shmring -> NTNDArray) - - Frame capture thread (shmring -> temp file, on demand) - - ZMQ data transfer server (temp file -> client) - - Peripheral control (bellow stage, power supply) + - TCP client server (control + buffered frame capture) + - Power supply / TEC monitoring + - Auto-reconnect on detector errors Acquisition subprocess: - UDP data reception (port 41000) @@ -29,269 +30,360 @@ import argparse import threading from typing import Any +from .config import Team1kConfig, load_config +from .state import DetectorState + from .detector.registers import RegisterInterface from .detector.commands import DetectorCommands from .detector.adc import ADCController from .detector.chip_config import TEAM1K_CONFIG, configure_chip -from .detector.parameter_file import apply_parameter_file from .acquisition.receiver import AcquisitionProcess -from .filewriter.capture import FrameCapture, DataTransferServer from .pva.interface import PVAInterface from .pva.streamer import PVAStreamer +from .tcp_server import TCPClientServer from .peripherals.bellow_stage import BellowStage -from .peripherals.power_supply import PowerSupply +from .peripherals.power_supply import DetectorPowerSupply +from .peripherals.tec import TECController logger = logging.getLogger(__name__) -# Default configuration -DEFAULT_DETECTOR_IP = "10.0.0.32" -DEFAULT_REGISTER_PORT = 42000 -DEFAULT_DATA_PORT = 41000 -DEFAULT_PV_PREFIX = "TEAM1K:" -DEFAULT_BELLOW_PORT = "/dev/CameraBellowStage" -DEFAULT_ZMQ_PORT = 42005 - class Team1kServer: """ Main server coordinating all subsystems. Manages direct UDP communication with the detector, data acquisition - via a dedicated subprocess, PVA streaming, and on-demand frame capture - with ZMQ data transfer to remote clients. + via a dedicated subprocess, PVA streaming, TCP client handling, and + peripheral control. Designed to be resilient to detector errors. """ - def __init__(self, detector_ip: str = DEFAULT_DETECTOR_IP, - register_port: int = DEFAULT_REGISTER_PORT, - data_port: int = DEFAULT_DATA_PORT, - pv_prefix: str = DEFAULT_PV_PREFIX, - config_file: str | None = None, - bellow_port: str = DEFAULT_BELLOW_PORT, - zmq_port: int = DEFAULT_ZMQ_PORT): - - self.detector_ip = detector_ip - self.register_port = register_port - self.data_port = data_port + def __init__(self, config: Team1kConfig): + self.config = config self.chip_config = TEAM1K_CONFIG - self._config_file = config_file self._shutdown_event = threading.Event() - self._acquiring = False + self._state_lock = threading.Lock() + self._state = DetectorState.DISCONNECTED - # Detector communication (main process, register port) - self.registers = RegisterInterface(detector_ip, register_port) - self.commands = DetectorCommands(self.registers) - self.adc = ADCController(self.registers) + # Cached parameter values (avoids register reads) + self._params = { + "exposure_mode": config.exposure_mode, + "trigger_mode": config.trigger_mode, + "trigger_polarity": config.trigger_polarity, + "integration_time": config.integration_time_ms, + "frame_rate": 0.0, + "frame_count": 0, + } + + # Detector communication + self.registers: RegisterInterface | None = None + self.commands: DetectorCommands | None = None + self.adc: ADCController | None = None # Acquisition subprocess self.acquisition = AcquisitionProcess( - detector_ip, data_port, + config.detector_ip, config.data_port, ring_name="team1k_frames", num_ring_slots=32, chip_config=self.chip_config, ) # PVA interface - self.pva = PVAInterface(self, prefix=pv_prefix) + self.pva = PVAInterface(self, prefix=config.pv_prefix) - # Frame capture + ZMQ data transfer (replaces local file writing) - self.frame_capture = FrameCapture("team1k_frames", self.chip_config) - self.data_server = DataTransferServer( - self.frame_capture, self.chip_config, port=zmq_port, - ) - - # PVA streamer will be created after PVA setup + # PVA streamer (created after PVA setup) self._pva_streamer: PVAStreamer | None = None + # TCP client server + self.tcp_server = TCPClientServer(self, port=config.client_port) + # Peripherals - self.bellow_stage = BellowStage(bellow_port) - self.power_supply = PowerSupply() + self.bellow_stage = BellowStage(config.bellow_stage) + self.detector_power = DetectorPowerSupply(config.detector_power) + self.tec = TECController(config.tec) - def initialize(self) -> None: - """ - Initialize the detector. + @property + def state(self) -> DetectorState: + with self._state_lock: + return self._state - Equivalent to the KTestFunctions constructor sequence: - 1. Read firmware version - 2. Configure chip type - 3. Set default exposure mode, trigger, integration time - 4. Set ADC clock frequency - 5. Set ADC data delay - 6. Optionally apply parameter file - """ - logger.info("Initializing detector at %s:%d", self.detector_ip, self.register_port) + @state.setter + def state(self, new_state: DetectorState) -> None: + with self._state_lock: + old = self._state + self._state = new_state + if old != new_state: + logger.info("State: %s -> %s", old.name, new_state.name) + try: + self.pva.post_state(new_state) + except Exception: + pass - # Read firmware version + def _connect_detector(self) -> bool: + """Create register interface and command objects. Returns success.""" try: - version = self.commands.read_firmware_version() - logger.info("Firmware version: %d", version) + if self.registers is not None: + try: + self.registers.close() + except Exception: + pass + + self.registers = RegisterInterface( + self.config.detector_ip, self.config.register_port, + ) + self.commands = DetectorCommands(self.registers) + self.adc = ADCController(self.registers) + return True except Exception as e: - logger.warning("Could not read firmware version: %s", e) + logger.error("Failed to connect to detector: %s", e) + return False - # Configure chip - configure_chip(self.registers, self.chip_config) - - # Set defaults (matching KTestFunctions constructor) - self.commands.set_exposure_mode(3) # Global shutter CDS - self.commands.set_trigger_mode(False) # Internal trigger - self.commands.set_integration_time(6.0) # 6 ms - self.adc.set_clock_freq(60.0) # 60 MHz - self.commands.set_adc_data_delay(0x1A7) - self.commands.set_adc_data_averaging(0) - self.commands.enable_fpga_test_data(False) - - # Apply parameter file if provided - if self._config_file: - ok, msg = apply_parameter_file( - self._config_file, self.commands, self.adc, self.registers, - restart_daq_callback=lambda: self._restart_daq() - ) - if not ok: - logger.error("Parameter file error:\n%s", msg) - else: - logger.info("Parameter file applied:\n%s", msg) - - logger.info("Detector initialized") - - def execute_command(self, command: str, value: Any) -> Any: + def initialize(self) -> bool: """ - Dispatch a command from a PVA put handler. + Initialize the detector with config values. - This is called from p4p's internal thread, so it must be thread-safe. - Register operations are protected by the RegisterInterface's lock. + Returns True on success, False on error (state set to ERROR). """ - logger.info("Executing command: %s = %s", command, value) + self.state = DetectorState.INITIALIZING - if command == "set_exposure_mode": - self.commands.set_exposure_mode(int(value)) - self.pva.post_status("EXPOSURE_MODE", int(value)) + if not self._connect_detector(): + self.state = DetectorState.ERROR + return False - elif command == "set_trigger_mode": - self.commands.set_trigger_mode(external=bool(int(value))) - self.pva.post_status("TRIGGER_MODE", int(value)) + try: + # Firmware version + try: + version = self.commands.read_firmware_version() + logger.info("Firmware version: %d", version) + except Exception as e: + logger.warning("Could not read firmware version: %s", e) - elif command == "set_integration_time": - self.commands.set_integration_time(float(value)) - self.pva.post_status("INTEGRATION_TIME", float(value)) + # Chip configuration + configure_chip(self.registers, self.chip_config) - elif command == "start_stop_daq": - if int(value): - self._start_daq() - else: - self._stop_daq() - - elif command == "capture_frames": - num = int(value) - if num <= 0: - raise ValueError(f"Invalid frame count: {num}") - self.frame_capture.capture(num) - self.pva.post_status("CAPTURE:STATUS", "CAPTURING") - self.pva.post_status("CAPTURE:TOTAL", num) - self.pva.post_status("CAPTURE:PROGRESS", 0) - - elif command == "set_adc_clock_freq": - self.adc.set_clock_freq(float(value)) - - elif command == "set_adc_data_delay": - self.commands.set_adc_data_delay(int(value)) - - elif command == "apply_parameter_file": - ok, msg = apply_parameter_file( - str(value), self.commands, self.adc, self.registers, - restart_daq_callback=lambda: self._restart_daq() + # Apply config defaults + self.commands.set_exposure_mode(self.config.exposure_mode) + self.commands.set_trigger_mode( + external=self.config.trigger_external, + polarity=not self.config.trigger_polarity_rising, ) - if not ok: - raise RuntimeError(msg) - self.pva.post_status("STATUS", [msg]) + self.commands.set_integration_time(self.config.integration_time_ms) + self.adc.set_clock_freq(self.config.adc_clock_frequency_mhz) + self.commands.set_adc_data_delay(self.config.adc_data_delay) + self.commands.set_adc_data_averaging(0) + self.commands.enable_fpga_test_data(False) - elif command == "reset_connection": - if int(value): - self._stop_daq() - self.registers.close() - self.registers = RegisterInterface(self.detector_ip, self.register_port) - self.commands = DetectorCommands(self.registers) - self.adc = ADCController(self.registers) - self.initialize() + # ADC order registers + self.registers.write_register(30, self.config.adc_order_7to0) + self.registers.write_register(31, self.config.adc_order_15to8) - elif command == "set_test_mode": - self.commands.enable_fpga_test_data(bool(int(value))) + # Digital signal registers + self.registers.write_register(27, self.config.digital_polarity) + self.registers.write_register(28, self.config.digital_order_7to0) + self.registers.write_register(29, self.config.digital_order_15to8) - elif command == "bellow_move": - insert = bool(int(value)) - success, error = self.bellow_stage.move(insert) + # Update cached params + self._params["exposure_mode"] = self.config.exposure_mode + self._params["trigger_mode"] = ( + 1 if self.config.trigger_external else 0 + ) + self._params["integration_time"] = self.config.integration_time_ms + + self.state = DetectorState.IDLE + logger.info("Detector initialized") + return True + + except Exception as e: + logger.error("Initialization failed: %s", e) + self.state = DetectorState.ERROR + return False + + def get_parameter(self, key: str) -> Any: + """Get a cached parameter value.""" + if key in self._params: + return self._params[key] + raise ValueError(f"Unknown parameter: {key}") + + def apply_parameter(self, name: str, value: Any) -> None: + """ + Apply a parameter change. Used by both PVA and TCP interfaces. + + Writes to the detector, updates the cache, and posts PVA RBVs. + """ + if self.commands is None: + raise RuntimeError("Detector not connected") + + try: + if name == "exposure_mode": + mode = int(value) + self.commands.set_exposure_mode(mode) + self._params["exposure_mode"] = mode + self.pva.post_exposure_mode(mode) + + elif name == "trigger_mode" or name == "trigger_polarity": + if name == "trigger_mode": + mode = int(value) + polarity = self.config.trigger_polarity + else: + mode = self.config.trigger_mode + polarity = int(value) + self.commands.set_trigger_mode(external=bool(mode), polarity=bool(polarity)) + self._params["trigger_mode"] = mode + self._params["trigger_polarity"] = polarity + self.pva.post_trigger_mode(mode) + + elif name in ("integration_time", "acquire_time"): + # PVA sends seconds (areaDetector convention), TCP sends ms + ms = float(value) + if name == "acquire_time": + ms = float(value) * 1000.0 # seconds to ms + self.commands.set_integration_time(ms) + self._params["integration_time"] = ms + self.pva.post("AcquireTime_RBV", ms / 1000.0) + + elif name == "acquire": + if bool(value): + self._start_daq() + else: + self._stop_daq() + + elif name == "power_enable": + if bool(value): + self.detector_power.power_on() + else: + self.detector_power.power_off() + self.pva.post("Power:Enable_RBV", bool(value)) + + elif name == "tec_enable": + if bool(value): + self.tec.power_on() + else: + self.tec.power_off() + self.pva.post("TEC:Enable_RBV", bool(value)) + + else: + raise ValueError(f"Unknown parameter: {name}") + + except Exception as e: + # On register errors, transition to ERROR + if "register" in str(e).lower() or "udp" in str(e).lower(): + self.state = DetectorState.ERROR + raise + + def execute_action(self, action: str) -> None: + """Execute a one-shot action (bellow, power, etc.).""" + if action == "bellow_insert": + success, error = self.bellow_stage.move(insert=True) if not success: raise RuntimeError(error) - self.pva.post_status("BELLOW:POSITION", self.bellow_stage.position) + self.pva.post("Bellow:Position_RBV", self.bellow_stage.position) - elif command == "power_control": - if int(value): - success, error = self.power_supply.turn_on() - else: - success, error = self.power_supply.turn_off() + elif action == "bellow_retract": + success, error = self.bellow_stage.move(insert=False) if not success: raise RuntimeError(error) + self.pva.post("Bellow:Position_RBV", self.bellow_stage.position) + + elif action == "power_on": + self.detector_power.power_on() + self.pva.post("Power:Enable_RBV", True) + + elif action == "power_off": + self.detector_power.power_off() + self.pva.post("Power:Enable_RBV", False) + + elif action == "tec_on": + self.tec.power_on() + self.pva.post("TEC:Enable_RBV", True) + + elif action == "tec_off": + self.tec.power_off() + self.pva.post("TEC:Enable_RBV", False) else: - raise ValueError(f"Unknown command: {command}") + raise ValueError(f"Unknown action: {action}") - return True + def reconnect(self) -> None: + """Reconnect to detector after power cycle or error.""" + logger.info("Reconnecting to detector...") + self._stop_daq() + if self.initialize(): + self._start_daq() def _start_daq(self) -> None: """Start data acquisition.""" - if self._acquiring: + if self.state == DetectorState.ACQUIRING: return - # Start the acquisition subprocess if not running + if self.commands is None: + raise RuntimeError("Detector not connected") + if not self.acquisition.is_alive: self.acquisition.start_process() - time.sleep(0.5) # Let subprocess initialize + time.sleep(0.5) - # Start data flow on the detector self.commands.stop_data_flow() time.sleep(0.01) self.commands.start_data_flow() - - # Tell subprocess to start receiving self.acquisition.start_acquisition() - self._acquiring = True - self.pva.post_status("ACQUIRING", True) - self.pva.post_status("STATUS", ["ACQUIRING"]) + self.state = DetectorState.ACQUIRING + self.pva.post("Acquire_RBV", True) logger.info("DAQ started") def _stop_daq(self) -> None: """Stop data acquisition.""" - if not self._acquiring: + if self.state != DetectorState.ACQUIRING: return - # Stop data flow on detector - self.commands.stop_data_flow() + try: + if self.commands: + self.commands.stop_data_flow() + self.acquisition.stop_acquisition() + except Exception as e: + logger.warning("Error stopping DAQ: %s", e) - # Stop acquisition subprocess - self.acquisition.stop_acquisition() - - self._acquiring = False - self.pva.post_status("ACQUIRING", False) - self.pva.post_status("STATUS", ["IDLE"]) + self.state = DetectorState.IDLE + self.pva.post("Acquire_RBV", False) logger.info("DAQ stopped") - def _restart_daq(self) -> bool: - """Restart DAQ (stop then start).""" - self._stop_daq() - time.sleep(0.1) - self._start_daq() - return True + def _auto_reconnect_loop(self) -> None: + """Background thread: auto-reconnect when in ERROR state.""" + interval = self.config.reconnect_interval + while not self._shutdown_event.is_set(): + if self.state == DetectorState.ERROR: + logger.info("Auto-reconnect: attempting...") + try: + self.reconnect() + except Exception as e: + logger.warning("Auto-reconnect failed: %s", e) + self._shutdown_event.wait(interval) def _status_update_loop(self) -> None: """Periodically update status PVs.""" while not self._shutdown_event.is_set(): try: if self._pva_streamer: - self.pva.post_status("FRAME_RATE", self._pva_streamer.frame_rate) - self.pva.post_status("FRAME_COUNT", self._pva_streamer.frame_count) + rate = self._pva_streamer.frame_rate + count = self._pva_streamer.frame_count + self._params["frame_rate"] = rate + self._params["frame_count"] = count + self.pva.post("FrameRate_RBV", rate) + self.pva.post("ArrayCounter_RBV", count) + + # Power supply status + for ch, readings in self.detector_power.get_status().items(): + self.pva.post(f"Power:Ch{ch}:Voltage_RBV", + readings.voltage) + self.pva.post(f"Power:Ch{ch}:Current_RBV", + readings.current) + + for ch, readings in self.tec.get_status().items(): + self.pva.post(f"TEC:Ch{ch}:Voltage_RBV", + readings.voltage) + self.pva.post(f"TEC:Ch{ch}:Current_RBV", + readings.current) - self.pva.post_status("CAPTURE:STATUS", self.frame_capture.status) - self.pva.post_status("CAPTURE:PROGRESS", self.frame_capture.num_captured) except Exception as e: logger.debug("Status update error: %s", e) @@ -302,22 +394,28 @@ class Team1kServer: Run the server. 1. Initialize detector - 2. Start PVA server - 3. Start acquisition subprocess - 4. Start file writer and PVA streamer threads + 2. Start PVA server + streamer + 3. Start acquisition (DAQ runs continuously) + 4. Start TCP client server 5. Block until shutdown signal """ - # Initialize + # Initialize detector self.initialize() - # Set up PVA server + # PVA server self.pva.setup() - # Start acquisition subprocess + # Update initial PVA values + self.pva.post_exposure_mode(self._params["exposure_mode"]) + self.pva.post_trigger_mode(self._params["trigger_mode"]) + self.pva.post("AcquireTime_RBV", + self._params["integration_time"] / 1000.0) + + # Acquisition subprocess self.acquisition.start_process() time.sleep(0.5) - # Start PVA streamer thread + # PVA streamer self._pva_streamer = PVAStreamer( self.acquisition.ring_name, self.pva.image_pv, @@ -325,22 +423,32 @@ class Team1kServer: ) self._pva_streamer.start() - # Start frame capture + ZMQ data transfer threads - self.frame_capture.start() - self.data_server.start() + # TCP client server + self.tcp_server.start() - # Start status update thread - status_thread = threading.Thread(target=self._status_update_loop, - daemon=True, name="team1k-status") - status_thread.start() + # Status update thread + threading.Thread( + target=self._status_update_loop, + daemon=True, name="team1k-status", + ).start() - # Update initial status - self.pva.post_status("STATUS", ["READY"]) - self.pva.post_status("EXPOSURE_MODE", self.commands.exposure_mode) + # Auto-reconnect thread (if enabled) + if self.config.auto_reconnect: + threading.Thread( + target=self._auto_reconnect_loop, + daemon=True, name="team1k-reconnect", + ).start() + + # Start DAQ automatically (continuous mode) + if self.state == DetectorState.IDLE: + try: + self._start_daq() + except Exception as e: + logger.error("Failed to start DAQ: %s", e) logger.info("Server running. Press Ctrl+C to stop.") - # Set up signal handlers + # Signal handlers def _handle_signal(signum, _frame): logger.info("Received signal %d, shutting down...", signum) self._shutdown_event.set() @@ -350,8 +458,6 @@ class Team1kServer: # Block until shutdown self._shutdown_event.wait() - - # Shutdown sequence self.shutdown() def shutdown(self) -> None: @@ -359,26 +465,17 @@ class Team1kServer: logger.info("Shutting down server...") self._shutdown_event.set() - # Stop DAQ - if self._acquiring: - self._stop_daq() + self._stop_daq() - # Stop PVA streamer if self._pva_streamer: self._pva_streamer.stop() - # Stop frame capture + data transfer - self.frame_capture.stop() - self.data_server.stop() - - # Shutdown acquisition subprocess + self.tcp_server.stop() self.acquisition.shutdown() - - # Stop PVA server self.pva.stop() - # Close register interface - self.registers.close() + if self.registers: + self.registers.close() logger.info("Server shutdown complete") @@ -386,39 +483,36 @@ class Team1kServer: def main(): """Main entry point for the server.""" parser = argparse.ArgumentParser(description="Team1k Detector Server") - parser.add_argument('--detector-ip', default=DEFAULT_DETECTOR_IP, - help=f"Detector IP address (default: {DEFAULT_DETECTOR_IP})") - parser.add_argument('--register-port', type=int, default=DEFAULT_REGISTER_PORT, - help=f"Detector register port (default: {DEFAULT_REGISTER_PORT})") - parser.add_argument('--data-port', type=int, default=DEFAULT_DATA_PORT, - help=f"Detector data port (default: {DEFAULT_DATA_PORT})") - parser.add_argument('--pv-prefix', default=DEFAULT_PV_PREFIX, - help=f"PVA prefix (default: {DEFAULT_PV_PREFIX})") - parser.add_argument('--config', default=None, - help="Parameter file to apply on startup") - parser.add_argument('--bellow-port', default=DEFAULT_BELLOW_PORT, - help=f"Bellow stage serial port (default: {DEFAULT_BELLOW_PORT})") - parser.add_argument('--zmq-port', type=int, default=DEFAULT_ZMQ_PORT, - help=f"ZMQ data transfer port (default: {DEFAULT_ZMQ_PORT})") - parser.add_argument('--log-level', default='INFO', - choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], + parser.add_argument("--config", default=None, + help="YAML config file path") + parser.add_argument("--detector-ip", default=None, + help="Override detector IP address") + parser.add_argument("--pv-prefix", default=None, + help="Override PVA prefix") + parser.add_argument("--client-port", type=int, default=None, + help="Override TCP client port") + parser.add_argument("--log-level", default="INFO", + choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Logging level (default: INFO)") args = parser.parse_args() logging.basicConfig( level=getattr(logging, args.log_level), - format='%(asctime)s %(name)-30s %(levelname)-8s %(message)s', + format="%(asctime)s %(name)-30s %(levelname)-8s %(message)s", ) - server = Team1kServer( - detector_ip=args.detector_ip, - register_port=args.register_port, - data_port=args.data_port, - pv_prefix=args.pv_prefix, - config_file=args.config, - bellow_port=args.bellow_port, - zmq_port=args.zmq_port, - ) + # Load config + config = load_config(args.config) + + # Apply CLI overrides + if args.detector_ip: + config.detector_ip = args.detector_ip + if args.pv_prefix: + config.pv_prefix = args.pv_prefix + if args.client_port: + config.client_port = args.client_port + + server = Team1kServer(config) try: server.run() diff --git a/src/team1k/state.py b/src/team1k/state.py new file mode 100644 index 0000000..e254f0a --- /dev/null +++ b/src/team1k/state.py @@ -0,0 +1,30 @@ +""" +Detector state enum. + +Tracks the current state of the detector connection and acquisition. +Used by the server, PVA interface, and TCP client handler. +""" + +import enum + + +class DetectorState(enum.IntEnum): + """Detector connection and acquisition state.""" + DISCONNECTED = 0 # Detector powered off or unreachable + INITIALIZING = 1 # Configuring registers + IDLE = 2 # Ready, DAQ not running + ACQUIRING = 3 # DAQ running, streaming frames + ERROR = 4 # Register error, needs reconnect + + @property + def label(self) -> str: + """Human-readable label for PVA NTEnum.""" + return self.name.capitalize() + + +# Labels for NTEnum PV choices (index matches IntEnum value) +DETECTOR_STATE_CHOICES = [s.label for s in DetectorState] + +EXPOSURE_MODE_CHOICES = ["Rolling", "Global", "GlobalFlush", "GlobalCDS"] + +TRIGGER_MODE_CHOICES = ["Internal", "External"] diff --git a/src/team1k/tcp_protocol.py b/src/team1k/tcp_protocol.py new file mode 100644 index 0000000..780f0e0 --- /dev/null +++ b/src/team1k/tcp_protocol.py @@ -0,0 +1,71 @@ +""" +TCP message framing protocol. + +Shared by tcp_server.py (server side) and client.py (client side). +No external dependencies — stdlib only. + +Message types: + - JSON messages: 1-byte type + 4-byte big-endian length prefix + UTF-8 JSON + - Binary data: 1-byte type + 8-byte big-endian length prefix + raw bytes +""" + +import json +import struct +import socket +from typing import Union + +# Length prefix formats +_JSON_HEADER = struct.Struct("!I") # 4-byte unsigned int (max ~4 GB) +_DATA_HEADER = struct.Struct("!Q") # 8-byte unsigned long (supports >4 GB) + +def send_msg(sock: socket.socket, obj: dict) -> None: + """Send a JSON-serializable dict with 4-byte length prefix.""" + data = json.dumps(obj, separators=(",", ":")).encode("utf-8") + sock.sendall(b"J" + _JSON_HEADER.pack(len(data)) + data) + +def recv_msg(sock: socket.socket) -> dict: + """Receive a length-prefixed JSON message. Returns parsed dict.""" + data = recv_bytes_or_msg(sock) + if isinstance(data, bytes): + raise ValueError("Expected JSON message but got binary data") + return data + +def send_bytes(sock: socket.socket, data: bytes | memoryview) -> None: + """Send raw bytes with 8-byte length prefix.""" + sock.sendall(b"D" + _DATA_HEADER.pack(len(data)) + data) + + +def recv_bytes_or_msg(sock: socket.socket) -> Union[bytes | dict]: + """Receive length-prefixed binary data (reads the 8-byte header first).""" + type_byte = _recv_exact(sock, 1) + if type_byte == b"J": + # JSON message + raw_len = _recv_exact(sock, _JSON_HEADER.size) + (length,) = _JSON_HEADER.unpack(raw_len) + data = _recv_exact(sock, length) + return json.loads(data) + elif type_byte == b"D": + # Binary data + raw_len = _recv_exact(sock, _DATA_HEADER.size) + (length,) = _DATA_HEADER.unpack(raw_len) + return _recv_exact(sock, length) + else: + raise ValueError(f"Unexpected message type: {type_byte}") + +def recv_exact(sock: socket.socket, n: int) -> bytes: + """Receive exactly n bytes from a socket. Public API.""" + return _recv_exact(sock, n) + +def _recv_exact(sock: socket.socket, n: int) -> bytes: + """Receive exactly n bytes, raising ConnectionError on premature close.""" + buf = bytearray(n) + view = memoryview(buf) + pos = 0 + while pos < n: + nbytes = sock.recv_into(view[pos:]) + if nbytes == 0: + raise ConnectionError( + f"Connection closed (received {pos}/{n} bytes)" + ) + pos += nbytes + return bytes(buf) diff --git a/src/team1k/tcp_server.py b/src/team1k/tcp_server.py new file mode 100644 index 0000000..3a7482d --- /dev/null +++ b/src/team1k/tcp_server.py @@ -0,0 +1,253 @@ +""" +TCP server for Python client connections. + +Handles control commands (get/set parameters, register access) +and frame capture requests. Each client gets its own thread. +Only one capture can run at a time (capture lock). +""" + +import socket +import logging +import threading +from typing import TYPE_CHECKING, Any + +from .tcp_protocol import send_msg, recv_msg +from .capture import BufferedCapture +from .state import DetectorState + +if TYPE_CHECKING: + from .server import Team1kServer + +logger = logging.getLogger(__name__) + + +class TCPClientServer(threading.Thread): + """ + TCP server accepting client connections for control and data. + + Each client connection is handled in a separate daemon thread. + Control commands are dispatched to server.apply_parameter() or + server.execute_action(). Captures use BufferedCapture. + """ + + def __init__(self, server: 'Team1kServer', port: int = 42010): + super().__init__(daemon=True, name="team1k-tcp-server") + self._server = server + self._port = port + self._capture_lock = threading.Lock() + self._shutdown = threading.Event() + self._sock: socket.socket | None = None + + def run(self) -> None: + self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._sock.bind(("0.0.0.0", self._port)) + self._sock.listen(5) + self._sock.settimeout(0.5) + + logger.info("TCP client server listening on port %d", self._port) + + while not self._shutdown.is_set(): + try: + client_sock, addr = self._sock.accept() + except socket.timeout: + continue + + logger.info("Client connected from %s", addr) + t = threading.Thread( + target=self._handle_client, + args=(client_sock, addr), + daemon=True, + name=f"team1k-client-{addr[0]}:{addr[1]}", + ) + t.start() + + self._sock.close() + + def _handle_client(self, sock: socket.socket, addr: tuple) -> None: + """Handle one client connection.""" + sock.settimeout(300) # 5 min idle timeout + try: + while not self._shutdown.is_set(): + try: + msg = recv_msg(sock) + except (ConnectionError, socket.timeout): + break + + try: + self._dispatch(sock, msg) + except Exception as e: + logger.error("Command error from %s: %s", addr, e) + try: + send_msg(sock, {"ok": False, "error": str(e)}) + except ConnectionError: + break + finally: + sock.close() + logger.info("Client disconnected: %s", addr) + + def _dispatch(self, sock: socket.socket, msg: dict) -> None: + """Dispatch a client command.""" + cmd = msg.get("cmd", "") + + if cmd == "get": + key = msg["key"] + value = self._server.get_parameter(key) + send_msg(sock, {"ok": True, "value": value}) + + elif cmd == "set": + key = msg["key"] + value = msg["value"] + self._server.apply_parameter(key, value) + send_msg(sock, {"ok": True}) + + elif cmd == "status": + send_msg(sock, { + "ok": True, + "state": self._server.state.name, + "exposure_mode": self._server.get_parameter("exposure_mode"), + "trigger_mode": self._server.get_parameter("trigger_mode"), + "integration_time": self._server.get_parameter("integration_time"), + "frame_rate": self._server.get_parameter("frame_rate"), + "frame_count": self._server.get_parameter("frame_count"), + "acquiring": self._server.state == DetectorState.ACQUIRING, + "power_on": self._server.detector_power.is_on, + "tec_on": self._server.tec.is_on, + "bellow_inserted": abs(self._server.bellow_stage.position - self._server.config.bellow_stage.inserted_position_um) < 100, + }) + + elif cmd == "capture": + if not self._capture_lock.acquire(blocking=False): + send_msg(sock, { + "ok": False, + "error": "Another capture is in progress", + }) + return + try: + sock.settimeout(None) # no timeout during capture + capture = BufferedCapture( + self._server.acquisition.ring_name, + self._server.chip_config, + ) + if "num_frames" in msg: + capture.capture_to_socket(sock, int(msg["num_frames"])) + elif "duration_sec" in msg: + capture.capture_duration_to_socket( + sock, float(msg["duration_sec"]), + ) + else: + send_msg(sock, { + "ok": False, + "error": "Specify num_frames or duration_sec", + }) + finally: + sock.settimeout(300) + self._capture_lock.release() + + elif cmd == "start_capture": + if not self._capture_lock.acquire(blocking=False): + send_msg(sock, { + "ok": False, + "error": "Another capture is in progress", + }) + return + try: + sock.settimeout(None) + capture = BufferedCapture( + self._server.acquisition.ring_name, + self._server.chip_config, + ) + handle = capture.start_streaming_to_socket(sock) + # Block until the streaming is stopped + # (client sends stop_capture, which we handle below) + # Actually, streaming runs in its own thread and sends + # data until handle.stop_event is set. We store the handle + # and wait for a stop_capture command. + self._active_stream = handle + # Don't send response here — the streaming thread already + # sent the header. We just wait for stop_capture. + # The client connection loop is paused while streaming. + handle.stop_event.wait() + finally: + sock.settimeout(300) + self._capture_lock.release() + self._active_stream = None + + elif cmd == "stop_capture": + handle = getattr(self, "_active_stream", None) + if handle: + handle.stop_event.set() + send_msg(sock, {"ok": True}) + else: + send_msg(sock, {"ok": False, "error": "No active capture"}) + + elif cmd == "register_read": + addr = int(msg["address"]) + value = self._server.registers.read_register(addr) + send_msg(sock, {"ok": True, "value": value}) + + elif cmd == "register_write": + addr = int(msg["address"]) + value = int(msg["value"]) + self._server.registers.write_register(addr, value) + send_msg(sock, {"ok": True}) + + elif cmd == "reconnect": + self._server.reconnect() + send_msg(sock, {"ok": True}) + + elif cmd == "insert_detector": + self._server.execute_action("bellow_insert") + send_msg(sock, {"ok": True}) + + elif cmd == "retract_detector": + self._server.execute_action("bellow_retract") + send_msg(sock, {"ok": True}) + + elif cmd == "power_on": + self._server.execute_action("power_on") + send_msg(sock, {"ok": True}) + + elif cmd == "power_off": + self._server.execute_action("power_off") + send_msg(sock, {"ok": True}) + + elif cmd == "tec_on": + self._server.execute_action("tec_on") + send_msg(sock, {"ok": True}) + + elif cmd == "tec_off": + self._server.execute_action("tec_off") + send_msg(sock, {"ok": True}) + + elif cmd == "power_status": + status = self._server.detector_power.get_status() + # Convert int keys to strings for JSON + send_msg(sock, { + "ok": True, + "channels": {str(k): dict(v) for k, v in status.items()}, + }) + + elif cmd == "tec_status": + status = self._server.tec.get_status() + send_msg(sock, { + "ok": True, + "channels": {str(k): dict(v) for k, v in status.items()}, + }) + + elif cmd == "bellow_status": + position = self._server.bellow_stage.position + is_moving = self._server.bellow_stage.is_moving + send_msg(sock, { + "ok": True, + "position_um": position, + "is_moving": is_moving, + "bellow_inserted": abs(position - self._server.config.bellow_stage.inserted_position_um) < 100, + }) + + else: + send_msg(sock, {"ok": False, "error": f"Unknown command: {cmd}"}) + + def stop(self) -> None: + """Signal shutdown.""" + self._shutdown.set()