More clauding

This commit is contained in:
2026-03-05 13:19:12 -06:00
parent 5a21f38dab
commit 29e80e74cd
29 changed files with 2380 additions and 1573 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
__pycache__/
*.egg-info/

274
README.md
View File

@@ -2,22 +2,24 @@
Python control library and server for the TEAM1k X-ray detector.
Communicates directly with the detector FPGA via UDP, replacing the previous `k_test` C++ program. Uses EPICS PV Access (PVA) for command/status and ZMQ for bulk data transfer.
Communicates directly with the detector FPGA via UDP. Uses EPICS PV Access (PVA) for the image stream and basic controls (areaDetector-style), and a TCP protocol for the Python client.
## Architecture
```
Client machine Detector machine
+--------------+ +-------------------------------+
| | PVA (commands/status) | team1k-server |
| Client |<---------------------->| Register I/O (UDP:42000) |
| | | PVA server |
| | ZMQ (frame data) | PVA streamer thread |
| |<---------------------->| Frame capture thread |
| | | ZMQ data transfer thread |
+--------------+ | |
| Acquisition subprocess |
| UDP data recv (UDP:41000) |
| | TCP (control+data) | team1k-server |
| Client |<--------------------->| Register I/O (UDP:42000) |
| (Python) | | TCP client server |
| | | Buffered frame capture |
+--------------+ | PVA server (areaDetector) |
| PVA streamer (IMAGE) |
EPICS clients | Power supply monitoring |
+--------------+ | Auto-reconnect |
| pvget/CSS/ | PVA (IMAGE stream) | |
| other tools |<--------------------->| Acquisition subprocess |
+--------------+ | UDP data recv (UDP:41000) |
| Frame assembly |
| -> locking_shmring |
+-------------------------------+
@@ -29,154 +31,202 @@ Client machine Detector machine
pip install .
```
For HDF5 support (optional):
For notebook progress bars (optional):
```bash
pip install ".[hdf5]"
pip install ".[client]"
```
### Dependencies
- `locking_shmring` — shared memory ring buffer (must be installed separately)
- `p4p` — EPICS PV Access for Python
- `pyzmq`ZeroMQ for bulk data transfer
- `pyyaml`YAML config parsing
- `pyvisa`, `pyvisa-py` — power supply control
- `numpy`, `pyserial`
## Configuration
Create a YAML config file:
```yaml
detector:
ip: "10.0.0.32"
register_port: 42000
data_port: 41000
auto_reconnect: true
reconnect_interval: 5.0
adc:
clock_frequency_mhz: 50
data_delay: 0x180
order_7to0: 0x89abcdef
order_15to8: 0x01234567
digital_signals:
order_7to0: 0x24FFFFFF
order_15to8: 0xF5F310FF
polarity: 0x0014
defaults:
exposure_mode: 3
trigger_mode: external
trigger_polarity: rising
integration_time_ms: 4.0
server:
pv_prefix: "TEAM1K:"
client_port: 42010
peripherals:
bellow_port: "/dev/CameraBellowStage"
detector_power:
port: "/dev/DetectorPowerSupply"
voltage_step: 0.2
channels:
1: { voltage: 5.0, current: 2.0, ovp: 6.0 }
2: { voltage: 3.3, current: 1.0, ovp: 4.0 }
tec:
port: "/dev/TECsPowerSupply"
voltage_step: 0.2
channels:
1: { voltage: 12.0, current: 3.0, ovp: 14.0 }
```
## Running the server
```bash
team1k-server --detector-ip 10.0.0.32 --log-level INFO
team1k-server --config config.yaml --log-level INFO
```
All options:
Options:
```
--detector-ip Detector IP address (default: 10.0.0.32)
--register-port Detector register port (default: 42000)
--data-port Detector data port (default: 41000)
--pv-prefix PVA prefix (default: TEAM1K:)
--zmq-port ZMQ data transfer port (default: 42005)
--config Parameter file to apply on startup
--bellow-port Bellow stage serial port (default: /dev/CameraBellowStage)
--config YAML config file path
--detector-ip Override detector IP address
--pv-prefix Override PVA prefix
--client-port Override TCP client port
--log-level DEBUG, INFO, WARNING, ERROR (default: INFO)
```
The server starts DAQ automatically after initialization (continuous mode). The PVA IMAGE stream is always live.
## Client usage
### Acquire frames
The Python client connects via TCP. No EPICS/p4p installation needed.
### Basic usage
```python
from team1k import Client, ExposureModes
from team1k import Client
client = Client(data_host="detector-machine")
client = Client("detector-machine")
# Configure
client.set_exposure_mode(ExposureModes.GLOBAL_SHUTTER_CDS)
client.set_integration_time(6.0) # ms
client.exposure_mode = 3 # GlobalCDS
client.trigger_mode = 1 # External
client.integration_time = 4.0 # ms
# One-shot: start DAQ, capture 100 frames, stop DAQ
frames = client.acquire_frames(100)
# Status
print(client.state) # "Acquiring"
print(client.frame_rate) # 500.0
# Capture frames (shows progress bar)
frames = client.capture(100)
print(frames.shape) # (100, 1024, 1024)
print(frames.dtype) # uint16
client.close()
```
### Manual DAQ control
### Capture modes
```python
client = Client(data_host="detector-machine")
# Fixed number of frames
frames = client.capture(num_frames=100)
client.start_daq()
# Capture for a duration
frames = client.capture(duration=5.0)
# Grab frames from the running stream
batch1 = client.get_frames(50)
batch2 = client.get_frames(50)
# Open-ended capture (start/stop)
stream = client.start_capture()
# ... do something ...
frames = stream.stop()
client.stop_daq()
client.close()
# Disable progress bar
frames = client.capture(100, progress=False)
```
### Live image monitoring
### Peripherals
```python
client = Client()
def on_frame(image):
print(f"Frame received: {image.shape}")
client.monitor_image(on_frame)
# ... later
client.stop_monitor("IMAGE")
client.close()
```
### Status
```python
client = Client()
print(client.get_status())
print(client.is_acquiring())
print(client.get_frame_rate())
print(client.get_exposure_mode())
print(client.get_integration_time())
client.close()
```
### Other commands
```python
client.set_trigger_mode(TriggerModes.EXTERNAL)
client.set_adc_clock_freq(60.0) # MHz
client.set_adc_data_delay(0x1A7)
client.load_parameter_file("/path/to/params.txt")
client.set_test_mode(True) # FPGA test pattern
client.reset_connection()
# Peripherals
# Bellow stage
client.insert_detector()
client.retract_detector()
# Detector power
client.power_on()
client.power_off()
print(client.power_status) # {1: {"voltage": 5.01, "current": 1.2}, ...}
# TEC
client.tec_on()
client.tec_off()
print(client.tec_status)
```
### Advanced
```python
# Direct register access
value = client.read_register(22)
client.write_register(22, 3)
# Reconnect after power cycle
client.power_on()
client.reconnect()
# Full status
print(client.status())
# Context manager
with Client("detector-machine") as c:
frames = c.capture(100)
```
## PV Access interface
All PVs are prefixed with `TEAM1K:` by default.
All PVs are prefixed with `TEAM1K:` by default. Follows areaDetector conventions with RW + `_RBV` readback pattern.
### Status PVs (read-only)
### Acquisition PVs
| PV | Type | Description |
|----|------|-------------|
| `STATUS` | string[] | Server status |
| `ACQUIRING` | bool | DAQ running |
| `FRAME_RATE` | float | Current frame rate (Hz) |
| `FRAME_COUNT` | int | Total frames acquired |
| `EXPOSURE_MODE` | int | Current exposure mode (0-3) |
| `TRIGGER_MODE` | int | Current trigger mode |
| `INTEGRATION_TIME` | float | Integration time (ms) |
| `CAPTURE:STATUS` | string | IDLE / CAPTURING / READY / ERROR |
| `CAPTURE:PROGRESS` | int | Frames captured so far |
| `CAPTURE:TOTAL` | int | Frames requested |
| `IMAGE` | NTNDArray | Live image stream |
| PV | Type | RW | Description |
|----|------|----|-------------|
| `Acquire` / `_RBV` | bool | RW | Start/stop DAQ |
| `AcquireTime` / `_RBV` | float | RW | Integration time (seconds) |
| `ExposureMode` / `_RBV` | enum | RW | Rolling/Rolling with Pause/Global/Global with CDS |
| `TriggerMode` / `_RBV` | enum | RW | Internal/External |
| `ArrayCounter_RBV` | int | RO | Total frames acquired |
| `FrameRate_RBV` | float | RO | Current frame rate (Hz) |
| `DetectorState_RBV` | enum | RO | Disconnected/Initializing/Idle/Acquiring/Error |
| `MaxSizeX_RBV` / `MaxSizeY_RBV` | int | RO | 1024, 1024 |
| `IMAGE` | NTNDArray | RO | Live image stream |
### Command PVs (writable)
### Peripheral PVs
| PV | Type | Description |
|----|------|-------------|
| `CMD:EXPOSURE_MODE` | int | Set exposure mode |
| `CMD:TRIGGER_MODE` | int | Set trigger mode |
| `CMD:INTEGRATION_TIME` | float | Set integration time (ms) |
| `CMD:START_DAQ` | int | 1=start, 0=stop |
| `CMD:CAPTURE` | int | Capture N frames |
| `CMD:ADC_CLOCK_FREQ` | float | Set ADC clock (MHz) |
| `CMD:ADC_DATA_DELAY` | int | Set ADC data delay |
| `CMD:PARAMETER_FILE` | string | Apply parameter file |
| `CMD:RESET` | int | 1=reset connection |
| `CMD:TEST_MODE` | int | 1=enable FPGA test data |
| PV | Type | RW | Description |
|----|------|----|-------------|
| `Bellow:Insert` | bool | RW | Write true to insert |
| `Bellow:Retract` | bool | RW | Write true to retract |
| `Bellow:Position_RBV` | int | RO | Current position |
| `Power:Enable` / `_RBV` | bool | RW | Detector power on/off |
| `Power:ChN:Voltage_RBV` | float | RO | Channel N voltage |
| `Power:ChN:Current_RBV` | float | RO | Channel N current |
| `TEC:Enable` / `_RBV` | bool | RW | TEC power on/off |
| `TEC:ChN:Voltage_RBV` | float | RO | Channel N voltage |
| `TEC:ChN:Current_RBV` | float | RO | Channel N current |
## Package structure
@@ -189,17 +239,21 @@ src/team1k/
chip_config.py # Chip constants (1024x1024, packet layout)
commands.py # Detector commands (exposure, trigger, etc.)
adc.py # Si570 I2C clock programming
parameter_file.py # Parameter file parser
acquisition/ # High-throughput data path
receiver.py # Acquisition subprocess (UDP -> shmring)
filewriter/ # On-demand frame capture
capture.py # FrameCapture + ZMQ DataTransferServer
pva/ # EPICS PV Access
interface.py # PVA server, command/status PVs
pva/ # EPICS PV Access (areaDetector-style)
interface.py # PVA server, RW PVs with _RBV readbacks
streamer.py # shmring -> NTNDArray live stream
peripherals/ # Hardware peripherals
power_base.py # VISA power supply base class
power_supply.py # Detector power supply
tec.py # TEC controller
bellow_stage.py # Camera bellow stage (serial)
power_supply.py # Power supply control
config.py # YAML configuration loader
state.py # DetectorState enum
server.py # Main server entry point
Client.py # PVA + ZMQ client library
tcp_server.py # TCP client handler
tcp_protocol.py # Shared TCP message framing
capture.py # Buffered frame capture (shmring -> TCP)
client.py # Pure-TCP client library
```

80
config.yaml Normal file
View File

@@ -0,0 +1,80 @@
adc:
clock_frequency_mhz: 50.0
data_delay: 384
order_15to8: 19088743
order_7to0: 2309737967
config_version: 1
defaults:
exposure_mode: 3
integration_time_ms: 4.0
trigger_mode: 1
trigger_polarity: 0
detector:
auto_reconnect: true
data_port: 41000
ip: 10.0.0.32
reconnect_interval: 5.0
register_port: 42000
digital:
order_15to8: 4126347519
order_7to0: 620756991
polarity: 20
peripherals:
bellow_stage:
absolute_positioning: false
baudrate: 9600
inserted_position_um: 0
port: /dev/CameraBellowStage
retracted_position_um: -2000000
speed: 100000
detector_power:
channels:
'1':
channel: 1
current: 0.0
ovp: 0.0
voltage: 0.0
'2':
channel: 2
current: 0.0
ovp: 0.0
voltage: 0.0
'3':
channel: 3
current: 0.0
ovp: 0.0
voltage: 0.0
'4':
channel: 4
current: 0.0
ovp: 0.0
voltage: 0.0
port: ''
voltage_step: 0.2
tec:
channels:
'1':
channel: 1
current: 0.0
ovp: 0.0
voltage: 0.0
'2':
channel: 2
current: 0.0
ovp: 0.0
voltage: 0.0
'3':
channel: 3
current: 0.0
ovp: 0.0
voltage: 0.0
'4':
channel: 4
current: 0.0
ovp: 0.0
voltage: 0.0
port: ''
voltage_step: 0.2
server:
client_port: 42010
pv_prefix: 'TEAM1K:'

View File

@@ -4,22 +4,25 @@ build-backend = "setuptools.build_meta"
[project]
name = "team1k"
version = "0.0.1"
version = "0.1.0"
authors = [
{ name="Sebastian Strempfer", email="sstrempfer@anl.gov" },
]
description = "Controls for the TEAM1k detector"
readme = "README.md"
requires-python = ">=3.9"
requires-python = ">=3.10"
dependencies = [
"numpy >= 1.24.0",
"pyserial >= 3.5",
"p4p >= 4.1.0",
"pyzmq >= 25.0.0",
"pyyaml >= 6.0",
"pyvisa >= 1.13.0",
"pyvisa-py >= 0.7.0",
"dataclass-mage >= 0.25.1",
]
[project.optional-dependencies]
hdf5 = ["h5py >= 3.0.0"]
client = ["tqdm >= 4.60.0"]
[project.scripts]
team1k-server = "team1k.server:main"

View File

@@ -1,152 +0,0 @@
Metadata-Version: 2.4
Name: team1k
Version: 0.0.1
Summary: Controls for the TEAM1k detector
Author-email: Sebastian Strempfer <sstrempfer@anl.gov>
Requires-Python: >=3.9
Description-Content-Type: text/markdown
Requires-Dist: numpy>=1.24.0
Requires-Dist: pyserial>=3.5
Requires-Dist: pydantic>=2.0.0
Requires-Dist: typing-extensions>=4.5.0
# Team1k: X-ray Detector Control System
This package provides control software for the Team1k X-ray detector system, including:
1. A server that manages:
- Camera bellow stage (linear motion)
- Power supply control
- Detector control via KTest commands
- Data streaming to EPICS PVA
2. Client libraries for both synchronous and asynchronous operation
## Installation
```bash
pip install -e .
```
## Server Usage
Start the Team1k server:
```bash
team1k-server
```
The server exposes:
- ZMQ REP socket on port 42003 for commands
- ZMQ PUB socket on port 42004 for status updates
- ZMQ SUB socket on port 42005 for data from NetworkInterface.cxx
- EPICS PVA for data streaming (if p4p is installed)
## Client Usage
### Basic Example
```python
from team1k import Client, ExposureModes, TriggerModes
# Create a client
client = Client('localhost')
# Control the bellow stage
client.insert_detector()
client.retract_detector()
# Control the power supply
client.power_supply_on()
voltages = client.read_voltages()
client.power_supply_off()
# Configure the detector
client.set_exposure_mode(ExposureModes.ROLLING_SHUTTER)
client.set_trigger_mode(TriggerModes.INTERNAL_TRIGGER)
client.set_integration_time(100.0) # ms
# Start/stop acquisition
client.start_daq()
client.stop_daq()
# Close the client
client.close()
```
### Async Client
```python
import asyncio
from team1k import AsyncClient
async def main():
# Create an async client
client = AsyncClient('localhost')
# Subscribe to status updates
def status_callback(status):
print(f"Status update: {status}")
await client.subscribe_to_status(status_callback)
# Get current status
status = await client.get_status()
print(f"Current status: {status}")
# Insert detector
await client.insert_detector()
# Close client
await client.close()
asyncio.run(main())
```
## Example Scripts
The `examples` directory contains sample scripts:
- `client_demo.py`: Demonstrates synchronous and asynchronous client usage
- `status_monitor.py`: Real-time monitoring of system status
## Server Commands
The server accepts the following commands:
| Command | Description |
|---------|-------------|
| `insert` | Insert the detector into the beam |
| `retract` | Retract the detector from the beam |
| `power_on` | Turn on the detector power supply |
| `power_off` | Turn off the detector power supply |
| `read_voltages` | Read the voltage values from the power supply |
| `get_status` | Get the current status of all devices |
| `subscribe PORT` | Subscribe to status updates with a callback port |
| `unsubscribe` | Unsubscribe from status updates |
| `kill` | Shut down the server |
Additionally, all commands from the KTestCommand enum are supported.
## Architecture
The system uses a distributed architecture:
1. **Team1k Server**:
- Manages all devices
- Provides command interface
- Publishes status updates
- Streams data to EPICS PVA
2. **NetworkInterface.cxx**:
- Communicates with the detector hardware
- Streams data to a ZMQ socket
3. **Team1k Client**:
- Connects to the server
- Sends commands
- Receives status updates
4. **EPICS PVA**:
- Provides standard interface for data acquisition systems
- Allows integration with other EPICS tools

View File

@@ -1,12 +0,0 @@
README.md
pyproject.toml
src/team1k/Client.py
src/team1k/KTestClient.py
src/team1k/__init__.py
src/team1k/server.py
src/team1k.egg-info/PKG-INFO
src/team1k.egg-info/SOURCES.txt
src/team1k.egg-info/dependency_links.txt
src/team1k.egg-info/entry_points.txt
src/team1k.egg-info/requires.txt
src/team1k.egg-info/top_level.txt

View File

@@ -1 +0,0 @@

View File

@@ -1,2 +0,0 @@
[console_scripts]
team1k-server = team1k.server:main

View File

@@ -1,4 +0,0 @@
numpy>=1.24.0
pyserial>=3.5
pydantic>=2.0.0
typing-extensions>=4.5.0

View File

@@ -1 +0,0 @@
team1k

View File

@@ -1,406 +0,0 @@
#!/usr/bin/env python3
"""
Team1k Client — PVA + ZMQ interface to the Team1k detector server.
Commands and status use EPICS PV Access (p4p).
Bulk frame retrieval uses ZMQ for efficient transfer over the network.
Typical usage::
client = Client()
client.set_exposure_mode(ExposureModes.GLOBAL_SHUTTER_CDS)
client.set_integration_time(6.0)
# One-shot: start DAQ, grab 100 frames, stop DAQ
frames = client.acquire_frames(100)
print(frames.shape) # (100, 1024, 1024)
# Or manual control
client.start_daq()
frames = client.get_frames(50)
client.stop_daq()
client.close()
"""
import enum
import json
import time
import logging
from typing import Any, Callable
import numpy as np
import zmq
from p4p.client.thread import Context, Subscription
logger = logging.getLogger(__name__)
DEFAULT_PV_PREFIX = "TEAM1K:"
DEFAULT_ZMQ_PORT = 42005
class ExposureModes(enum.IntEnum):
"""Detector exposure modes."""
ROLLING_SHUTTER = 0
GLOBAL_SHUTTER = 1
GLOBAL_SHUTTER_FLUSH = 2
GLOBAL_SHUTTER_CDS = 3
class TriggerModes(enum.IntEnum):
"""Detector trigger modes."""
INTERNAL = 0
EXTERNAL = 1
class CommandError(Exception):
"""Raised when a PVA command put or ZMQ transfer fails."""
pass
class Client:
"""
Client for the Team1k detector server.
Uses PVA for commands/status and ZMQ for bulk frame data transfer.
Args:
prefix: PV name prefix (default: "TEAM1K:").
data_host: Hostname/IP of the server's ZMQ data port.
data_port: ZMQ port for frame data transfer.
timeout: Default timeout in seconds for PVA operations.
"""
def __init__(self, prefix: str = DEFAULT_PV_PREFIX,
data_host: str = "localhost",
data_port: int = DEFAULT_ZMQ_PORT,
timeout: float = 5.0):
self._prefix = prefix
self._data_host = data_host
self._data_port = data_port
self._timeout = timeout
self._ctx = Context("pva")
self._zmq_ctx = zmq.Context()
self._subscriptions: dict[str, Subscription] = {}
def _pv(self, name: str) -> str:
"""Build full PV name."""
return f"{self._prefix}{name}"
def _put(self, pv_suffix: str, value: Any) -> None:
"""Put a value to a command PV."""
pv_name = self._pv(pv_suffix)
try:
self._ctx.put(pv_name, value, timeout=self._timeout)
except Exception as e:
raise CommandError(f"Failed to put {pv_name}={value}: {e}") from e
def _get(self, pv_suffix: str) -> Any:
"""Get the current value of a PV."""
pv_name = self._pv(pv_suffix)
try:
result = self._ctx.get(pv_name, timeout=self._timeout)
return result.value if hasattr(result, 'value') else result
except Exception as e:
raise CommandError(f"Failed to get {pv_name}: {e}") from e
# --- Frame retrieval (the primary data interface) ---
def get_frames(self, num_frames: int, timeout: float = 120.0) -> np.ndarray:
"""
Capture frames from the running DAQ and transfer them.
DAQ must already be running. This triggers a server-side capture of
N consecutive frames, waits for completion, then fetches the data
over ZMQ.
Args:
num_frames: Number of frames to capture.
timeout: Maximum time to wait for capture + transfer (seconds).
Returns:
numpy array of shape (num_frames, ny, nx) with dtype uint16.
Raises:
CommandError: If capture fails, times out, or transfer errors.
"""
# Trigger server-side capture
self._put("CMD:CAPTURE", num_frames)
# Poll capture status until READY
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
status = self._get("CAPTURE:STATUS")
if status == "READY":
break
if status == "ERROR":
raise CommandError("Server-side capture failed")
time.sleep(0.1)
else:
raise CommandError(
f"Capture timed out after {timeout}s "
f"(status={self._get('CAPTURE:STATUS')})"
)
# Fetch data over ZMQ
remaining = deadline - time.monotonic()
return self._fetch_zmq(timeout=max(remaining, 5.0))
def acquire_frames(self, num_frames: int, timeout: float = 120.0) -> np.ndarray:
"""
Start DAQ, capture N frames, stop DAQ, and return the data.
Convenience method for one-shot acquisitions. Equivalent to::
client.start_daq()
frames = client.get_frames(num_frames)
client.stop_daq()
Args:
num_frames: Number of frames to capture.
timeout: Maximum time to wait (seconds).
Returns:
numpy array of shape (num_frames, ny, nx) with dtype uint16.
"""
self.start_daq()
try:
return self.get_frames(num_frames, timeout=timeout)
finally:
self.stop_daq()
def _fetch_zmq(self, timeout: float = 60.0) -> np.ndarray:
"""Fetch captured frame data from the server via ZMQ."""
sock = self._zmq_ctx.socket(zmq.REQ)
sock.setsockopt(zmq.RCVTIMEO, int(timeout * 1000))
sock.setsockopt(zmq.LINGER, 0)
sock.connect(f"tcp://{self._data_host}:{self._data_port}")
try:
sock.send(b"FETCH")
parts = sock.recv_multipart()
if len(parts) != 2:
raise CommandError(f"Unexpected ZMQ response: {len(parts)} parts")
header = json.loads(parts[0])
if "error" in header:
raise CommandError(f"Server error: {header['error']}")
data = parts[1]
n = header["num_frames"]
ny = header["ny"]
nx = header["nx"]
dtype = np.dtype(header.get("dtype", "uint16"))
expected_bytes = n * ny * nx * dtype.itemsize
if len(data) != expected_bytes:
raise CommandError(
f"Data size mismatch: got {len(data)}, "
f"expected {expected_bytes}"
)
arr = np.frombuffer(data, dtype=dtype).copy()
return arr.reshape(n, ny, nx)
except zmq.Again:
raise CommandError(f"ZMQ fetch timed out after {timeout}s")
finally:
sock.close()
# --- Detector commands ---
def set_exposure_mode(self, mode: ExposureModes | int) -> None:
"""Set the detector exposure mode (0-3)."""
self._put("CMD:EXPOSURE_MODE", int(mode))
def set_trigger_mode(self, mode: TriggerModes | int) -> None:
"""Set the trigger mode (0=internal, 1=external)."""
self._put("CMD:TRIGGER_MODE", int(mode))
def set_integration_time(self, time_ms: float) -> None:
"""Set the integration time in milliseconds."""
self._put("CMD:INTEGRATION_TIME", time_ms)
def start_daq(self) -> None:
"""Start data acquisition."""
self._put("CMD:START_DAQ", 1)
def stop_daq(self) -> None:
"""Stop data acquisition."""
self._put("CMD:START_DAQ", 0)
def set_adc_clock_freq(self, freq_mhz: float) -> None:
"""Set ADC clock frequency in MHz (50-100)."""
self._put("CMD:ADC_CLOCK_FREQ", freq_mhz)
def set_adc_data_delay(self, delay: int) -> None:
"""Set ADC data delay."""
self._put("CMD:ADC_DATA_DELAY", delay)
def load_parameter_file(self, file_path: str) -> None:
"""Apply a parameter file on the server."""
self._put("CMD:PARAMETER_FILE", file_path)
def reset_connection(self) -> None:
"""Reset the detector connection (stop DAQ, reinitialize)."""
self._put("CMD:RESET", 1)
def set_test_mode(self, enable: bool) -> None:
"""Enable or disable FPGA test data generation."""
self._put("CMD:TEST_MODE", 1 if enable else 0)
# --- Peripheral commands ---
def insert_detector(self) -> None:
"""Insert the detector (move bellow stage)."""
self._put("BELLOW:CMD", 1)
def retract_detector(self) -> None:
"""Retract the detector (move bellow stage)."""
self._put("BELLOW:CMD", 0)
def power_on(self) -> None:
"""Turn on the detector power supply."""
self._put("POWER:CMD", 1)
def power_off(self) -> None:
"""Turn off the detector power supply."""
self._put("POWER:CMD", 0)
# --- Status getters ---
def get_status(self) -> list[str]:
"""Get server status strings."""
val = self._get("STATUS")
return list(val) if not isinstance(val, str) else [val]
def is_acquiring(self) -> bool:
"""Check if DAQ is running."""
return bool(self._get("ACQUIRING"))
def get_frame_rate(self) -> float:
"""Get current frame rate in Hz."""
return float(self._get("FRAME_RATE"))
def get_frame_count(self) -> int:
"""Get total frames acquired."""
return int(self._get("FRAME_COUNT"))
def get_exposure_mode(self) -> int:
"""Get current exposure mode."""
return int(self._get("EXPOSURE_MODE"))
def get_trigger_mode(self) -> int:
"""Get current trigger mode."""
return int(self._get("TRIGGER_MODE"))
def get_integration_time(self) -> float:
"""Get current integration time in ms."""
return float(self._get("INTEGRATION_TIME"))
def get_capture_status(self) -> str:
"""Get capture status (IDLE, CAPTURING, READY, ERROR)."""
return str(self._get("CAPTURE:STATUS"))
def get_capture_progress(self) -> tuple[int, int]:
"""Get capture progress as (captured, total)."""
progress = int(self._get("CAPTURE:PROGRESS"))
total = int(self._get("CAPTURE:TOTAL"))
return progress, total
def get_bellow_position(self) -> int:
"""Get bellow stage position."""
return int(self._get("BELLOW:POSITION"))
def get_power_voltages(self) -> np.ndarray:
"""Get power supply voltage readings."""
return np.asarray(self._get("POWER:VOLTAGES"))
# --- Monitors (live subscriptions) ---
def monitor_image(self, callback: Callable[[np.ndarray], None]) -> None:
"""
Subscribe to live image updates.
The callback receives a 2D numpy array for each new frame.
Only one image monitor can be active at a time.
Args:
callback: Function called with numpy array for each frame.
"""
self.stop_monitor("IMAGE")
pv_name = self._pv("IMAGE")
def _on_image(value):
try:
arr = value.value
if hasattr(value, 'dimension') and value.dimension:
shape = tuple(d.size for d in value.dimension)
arr = arr.reshape(shape)
callback(arr)
except Exception as e:
logger.debug("Image monitor error: %s", e)
self._subscriptions["IMAGE"] = self._ctx.monitor(pv_name, _on_image)
def monitor_status(self, pv_name: str,
callback: Callable[[Any], None]) -> None:
"""
Subscribe to a status PV for live updates.
Args:
pv_name: PV suffix (e.g., "FRAME_RATE", "ACQUIRING").
callback: Function called with the new value on each update.
"""
self.stop_monitor(pv_name)
full_name = self._pv(pv_name)
def _on_update(value):
try:
val = value.value if hasattr(value, 'value') else value
callback(val)
except Exception as e:
logger.debug("Monitor %s error: %s", pv_name, e)
self._subscriptions[pv_name] = self._ctx.monitor(full_name, _on_update)
def stop_monitor(self, name: str) -> None:
"""Stop a specific monitor subscription."""
sub = self._subscriptions.pop(name, None)
if sub is not None:
sub.close()
def stop_all_monitors(self) -> None:
"""Stop all active monitor subscriptions."""
for sub in self._subscriptions.values():
sub.close()
self._subscriptions.clear()
def close(self) -> None:
"""Close the client and release all resources."""
self.stop_all_monitors()
self._ctx.close()
self._zmq_ctx.term()
# Example usage
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
client = Client(data_host="detector-machine")
try:
print("Status:", client.get_status())
# One-shot acquisition of 10 frames
client.set_exposure_mode(ExposureModes.GLOBAL_SHUTTER_CDS)
client.set_integration_time(6.0)
frames = client.acquire_frames(10)
print(f"Got {frames.shape[0]} frames, shape={frames.shape}")
except CommandError as e:
print(f"Error: {e}")
finally:
client.close()

View File

@@ -1,8 +1,7 @@
from .Client import Client, ExposureModes, TriggerModes, CommandError
from .client import Client
from .state import DetectorState
__all__ = [
'Client',
'ExposureModes',
'TriggerModes',
'CommandError',
'DetectorState',
]

333
src/team1k/capture.py Normal file
View File

@@ -0,0 +1,333 @@
"""
Buffered frame capture with TCP streaming.
Captures frames from the shared memory ring buffer to a temp file
(at detector speed), then streams them to the client over TCP
(at network speed). The temp file buffers the speed difference.
This capture is completely independent of the PVA IMAGE stream —
it uses its own RingBufferReader.
"""
import os
import time
import logging
import tempfile
import threading
from pathlib import Path
import numpy as np
from .detector.chip_config import ChipConfig, TEAM1K_CONFIG
from .tcp_protocol import send_msg, send_bytes
logger = logging.getLogger(__name__)
class CaptureHandle:
"""Handle for an in-progress open-ended (start/stop) capture."""
def __init__(self):
self.stop_event = threading.Event()
self.num_captured = 0
self.error: str | None = None
class BufferedCapture:
"""
Captures frames from shmring with temp file buffering.
Uses its own RingBufferReader, independent of the PVA streamer.
Only one capture can run at a time (enforced externally by tcp_server).
"""
def __init__(self, ring_name: str, chip_config: ChipConfig = TEAM1K_CONFIG):
self._ring_name = ring_name
self._config = chip_config
self._frame_bytes = chip_config.image_nx * chip_config.image_ny * 2 # uint16
def capture_to_socket(self, sock, num_frames: int) -> int:
"""
Capture N frames, stream them to the client socket.
Phase 1 (writer thread): shmring → temp file (fast, detector speed)
Phase 2 (this thread): temp file → TCP socket (network speed)
Both phases run concurrently; the temp file acts as buffer.
Returns:
Number of frames sent to client.
"""
from locking_shmring import RingBufferReader
# Send header
send_msg(sock, {
"ok": True,
"num_frames": num_frames,
"nx": self._config.image_nx,
"ny": self._config.image_ny,
"dtype": "uint16",
"frame_bytes": self._frame_bytes,
})
fd, temp_path = tempfile.mkstemp(suffix=".raw", prefix="team1k_cap_")
os.close(fd)
writer_done = threading.Event()
frames_written = [0] # mutable counter shared with writer thread
writer_error = [None]
def _writer():
"""Read frames from shmring → temp file."""
try:
reader = RingBufferReader(self._ring_name)
with open(temp_path, "wb") as f:
for slot in reader.iter_frames(timeout_ms=5000):
with slot:
frame = slot.data_as_numpy(
(self._config.image_ny, self._config.image_nx),
np.uint16,
)
f.write(frame.tobytes())
frames_written[0] += 1
if frames_written[0] >= num_frames:
break
reader.close()
except Exception as e:
writer_error[0] = str(e)
logger.error("Capture writer error: %s", e)
finally:
writer_done.set()
# Start writer thread
writer = threading.Thread(target=_writer, daemon=True,
name="team1k-cap-writer")
writer.start()
# Stream from temp file → socket as data becomes available
frames_sent = 0
try:
with open(temp_path, "rb") as f:
while frames_sent < num_frames:
# Wait for writer to produce more data
while frames_written[0] <= frames_sent:
if writer_done.is_set():
break
time.sleep(0.001)
if frames_written[0] <= frames_sent and writer_done.is_set():
break
# Read and send available frames
while frames_sent < frames_written[0] and frames_sent < num_frames:
frame_data = f.read(self._frame_bytes)
if len(frame_data) != self._frame_bytes:
break
send_bytes(sock, frame_data)
frames_sent += 1
except (ConnectionError, BrokenPipeError) as e:
logger.warning("Client disconnected during capture: %s", e)
finally:
writer_done.wait(timeout=10)
try:
Path(temp_path).unlink(missing_ok=True)
except Exception:
pass
# Send completion
error = writer_error[0]
send_msg(sock, {
"done": True,
"num_frames": frames_sent,
"error": error,
})
if error:
logger.error("Capture completed with error: %s (sent %d/%d)",
error, frames_sent, num_frames)
else:
logger.info("Capture complete: sent %d/%d frames",
frames_sent, num_frames)
return frames_sent
def capture_duration_to_socket(self, sock, duration_sec: float) -> int:
"""
Capture frames for a specified duration, stream to client.
Since frame count isn't known upfront, each frame is sent with
an 8-byte length prefix. A completion message signals the end.
Returns:
Number of frames sent.
"""
from locking_shmring import RingBufferReader
# Send header with unknown frame count
send_msg(sock, {
"ok": True,
"num_frames": -1,
"nx": self._config.image_nx,
"ny": self._config.image_ny,
"dtype": "uint16",
"frame_bytes": self._frame_bytes,
})
fd, temp_path = tempfile.mkstemp(suffix=".raw", prefix="team1k_dur_")
os.close(fd)
writer_done = threading.Event()
frames_written = [0]
def _writer():
try:
reader = RingBufferReader(self._ring_name)
deadline = time.monotonic() + duration_sec
with open(temp_path, "wb") as f:
for slot in reader.iter_frames(timeout_ms=1000):
with slot:
frame = slot.data_as_numpy(
(self._config.image_ny, self._config.image_nx),
np.uint16,
)
f.write(frame.tobytes())
frames_written[0] += 1
if time.monotonic() >= deadline:
break
reader.close()
except Exception as e:
logger.error("Duration capture writer error: %s", e)
finally:
writer_done.set()
writer = threading.Thread(target=_writer, daemon=True,
name="team1k-dur-writer")
writer.start()
frames_sent = 0
try:
with open(temp_path, "rb") as f:
while True:
while frames_written[0] <= frames_sent:
if writer_done.is_set():
break
time.sleep(0.001)
if frames_written[0] <= frames_sent and writer_done.is_set():
break
while frames_sent < frames_written[0]:
frame_data = f.read(self._frame_bytes)
if len(frame_data) != self._frame_bytes:
break
send_bytes(sock, frame_data)
frames_sent += 1
except (ConnectionError, BrokenPipeError) as e:
logger.warning("Client disconnected during capture: %s", e)
finally:
writer_done.wait(timeout=max(duration_sec + 5, 10))
try:
Path(temp_path).unlink(missing_ok=True)
except Exception:
pass
send_msg(sock, {"done": True, "num_frames": frames_sent})
logger.info("Duration capture complete: %d frames in %.1fs",
frames_sent, duration_sec)
return frames_sent
def start_streaming_to_socket(self, sock) -> CaptureHandle:
"""
Start open-ended capture. Returns a handle; call handle.stop_event.set()
to stop. Frames are sent with length prefix since count is unknown.
Returns:
CaptureHandle that the caller can use to stop the capture.
"""
handle = CaptureHandle()
send_msg(sock, {
"ok": True,
"num_frames": -1,
"nx": self._config.image_nx,
"ny": self._config.image_ny,
"dtype": "uint16",
"frame_bytes": self._frame_bytes,
})
def _stream():
from locking_shmring import RingBufferReader
fd, temp_path = tempfile.mkstemp(suffix=".raw",
prefix="team1k_stream_")
os.close(fd)
writer_done = threading.Event()
frames_written = [0]
def _writer():
try:
reader = RingBufferReader(self._ring_name)
with open(temp_path, "wb") as f:
for slot in reader.iter_frames(timeout_ms=1000):
with slot:
frame = slot.data_as_numpy(
(self._config.image_ny,
self._config.image_nx),
np.uint16,
)
f.write(frame.tobytes())
frames_written[0] += 1
if handle.stop_event.is_set():
break
reader.close()
except Exception as e:
handle.error = str(e)
logger.error("Streaming writer error: %s", e)
finally:
writer_done.set()
writer = threading.Thread(target=_writer, daemon=True,
name="team1k-stream-writer")
writer.start()
frames_sent = 0
try:
with open(temp_path, "rb") as f:
while True:
while frames_written[0] <= frames_sent:
if writer_done.is_set():
break
time.sleep(0.001)
if frames_written[0] <= frames_sent and writer_done.is_set():
break
while frames_sent < frames_written[0]:
frame_data = f.read(self._frame_bytes)
if len(frame_data) != self._frame_bytes:
break
send_bytes(sock, frame_data)
frames_sent += 1
except (ConnectionError, BrokenPipeError) as e:
logger.warning("Client disconnected during streaming: %s", e)
finally:
handle.stop_event.set()
writer_done.wait(timeout=10)
try:
Path(temp_path).unlink(missing_ok=True)
except Exception:
pass
handle.num_captured = frames_sent
send_msg(sock, {"done": True, "num_frames": frames_sent})
logger.info("Streaming capture complete: %d frames", frames_sent)
stream_thread = threading.Thread(target=_stream, daemon=True,
name="team1k-stream")
stream_thread.start()
return handle

460
src/team1k/client.py Normal file
View File

@@ -0,0 +1,460 @@
"""
Team1k Client — Pure-TCP client for the Team1k detector server.
No EPICS/p4p/zmq dependencies. Only requires numpy (and optionally tqdm
for progress bars in notebooks).
Usage::
from team1k import Client
client = Client("detector-host")
# Configure
client.exposure_mode = 3 # GlobalCDS
client.trigger_mode = 1 # External
client.integration_time = 4.0 # ms
# Capture frames
frames = client.capture(100)
print(frames.shape) # (100, 1024, 1024)
# Or capture by duration
frames = client.capture(duration=5.0)
# Open-ended capture
stream = client.start_capture()
# ... later ...
frames = stream.stop()
client.close()
"""
import io
import socket
import logging
from typing import Any
import numpy as np
from .config import ExposureMode, TriggerMode, TriggerPolarity
from .tcp_protocol import recv_bytes_or_msg, send_msg, recv_msg, recv_exact
logger = logging.getLogger(__name__)
# Default server port (must match server's config.client_port)
DEFAULT_PORT = 42010
class CaptureStream:
"""
Handle for an open-ended streaming capture.
Created by Client.start_capture(). Call stop() to end the capture
and retrieve all frames as a numpy array.
"""
def __init__(self, sock: socket.socket, header: dict):
self._sock = sock
self.nx = header["nx"]
self.ny = header["ny"]
self.frame_bytes = header["frame_bytes"]
self._dtype = np.dtype(header.get("dtype", "uint16"))
self._frames: list[np.ndarray] = []
self._stopped = False
def stop(self, progress: bool = True) -> np.ndarray:
"""
Stop the capture and return all captured frames.
Args:
progress: Show progress bar during final data reception.
Returns:
numpy array of shape (N, ny, nx).
"""
if self._stopped:
if self._frames:
return np.stack(self._frames)
return np.empty((0, self.ny, self.nx), dtype=self._dtype)
# Send stop command — the server checks for this
send_msg(self._sock, {"cmd": "stop_capture"})
# Receive remaining frames until done message
self._receive_frames(progress=progress)
self._stopped = True
if self._frames:
return np.stack(self._frames)
return np.empty((0, self.ny, self.nx), dtype=self._dtype)
def _receive_frames(self, progress: bool = True) -> None:
"""Receive frames until a done message arrives."""
pbar = None
if progress:
pbar = _make_pbar(desc="Receiving", unit="frame")
while True:
# Frames come with 8-byte length prefix (unknown count)
data = recv_bytes_or_msg(self._sock)
# Check if it's a JSON message (done signal)
if isinstance(data, dict):
if data.get("done"):
break
continue
frame = np.frombuffer(data, dtype=self._dtype).reshape(
self.ny, self.nx,
).copy()
self._frames.append(frame)
if pbar is not None:
pbar.update(1)
if pbar is not None:
pbar.close()
class Client:
"""
Client for the Team1k detector server.
Uses a TCP connection for all control and data transfer.
No EPICS/p4p/zmq dependencies required.
Args:
host: Server hostname or IP address.
port: TCP port (default: 42010).
timeout: Connection timeout in seconds.
"""
def __init__(self, host: str = "localhost", port: int = DEFAULT_PORT,
timeout: float = 5.0):
self._sock = socket.create_connection((host, port), timeout=timeout)
self._sock.settimeout(timeout)
self._host = host
self._port = port
self._timeout = timeout
# --- Properties ---
@property
def exposure_mode(self) -> ExposureMode:
"""Current exposure mode (0=Rolling, 1=Rolling with Pause, 2=Global Shutter, 3=Global Shutter CDS)."""
return ExposureMode(self._get("exposure_mode"))
@exposure_mode.setter
def exposure_mode(self, value: ExposureMode | int) -> None:
self._set("exposure_mode", int(value))
@property
def trigger_mode(self) -> TriggerMode:
"""Current trigger mode (0=Internal, 1=External)."""
return TriggerMode(self._get("trigger_mode"))
@trigger_mode.setter
def trigger_mode(self, value: TriggerMode | int) -> None:
self._set("trigger_mode", int(value))
@property
def trigger_polarity(self) -> TriggerPolarity:
"""Current trigger polarity (0=Rising, 1=Falling)."""
return TriggerPolarity(self._get("trigger_polarity"))
@trigger_polarity.setter
def trigger_polarity(self, value: TriggerPolarity | int) -> None:
self._set("trigger_polarity", int(value))
@property
def integration_time(self) -> float:
"""Integration time in milliseconds."""
return self._get("integration_time")
@integration_time.setter
def integration_time(self, value: float) -> None:
self._set("integration_time", value)
@property
def frame_rate(self) -> float:
"""Current frame rate in Hz (read-only)."""
return self._get("frame_rate")
@property
def frame_count(self) -> int:
"""Total frames acquired (read-only)."""
return self._get("frame_count")
@property
def state(self) -> str:
"""Detector state (Disconnected/Initializing/Idle/Acquiring/Error)."""
return self.status()["state"]
# --- Capture ---
def capture(self, num_frames: int | None = None,
duration: float | None = None,
progress: bool = True) -> np.ndarray:
"""
Capture frames from the detector.
Specify either num_frames or duration (not both). The DAQ must be
running (it runs continuously by default).
Args:
num_frames: Number of frames to capture.
duration: Duration in seconds to capture.
progress: Show tqdm progress bar (default True).
Returns:
numpy array of shape (N, ny, nx), dtype uint16.
"""
if num_frames is not None and duration is not None:
raise ValueError("Specify num_frames or duration, not both")
if num_frames is None and duration is None:
raise ValueError("Specify num_frames or duration")
if num_frames is not None:
send_msg(self._sock, {"cmd": "capture", "num_frames": num_frames})
else:
send_msg(self._sock, {"cmd": "capture", "duration_sec": duration})
# Receive header
self._sock.settimeout(None) # no timeout during capture
try:
header = recv_msg(self._sock)
except Exception:
self._sock.settimeout(self._timeout)
raise
if not header.get("ok"):
self._sock.settimeout(self._timeout)
raise RuntimeError(header.get("error", "Capture failed"))
n = header["num_frames"]
nx = header["nx"]
ny = header["ny"]
frame_bytes = header["frame_bytes"]
dtype = np.dtype(header.get("dtype", "uint16"))
try:
if n > 0:
# Known frame count: frames sent as raw bytes, no length prefix
return self._recv_fixed_frames(
n, nx, ny, frame_bytes, dtype, progress,
)
else:
# Unknown frame count (duration): frames with length prefix
return self._recv_variable_frames(
nx, ny, dtype, progress,
)
finally:
self._sock.settimeout(self._timeout)
def _recv_fixed_frames(self, n: int, nx: int, ny: int,
frame_bytes: int, dtype: np.dtype,
progress: bool) -> np.ndarray:
"""Receive a known number of frames (no length prefix per frame)."""
frames = np.empty((n, ny, nx), dtype=dtype)
iterator = range(n)
pbar = None
if progress:
pbar = _make_pbar(total=n, desc="Capturing", unit="frame")
for i in iterator:
raw = recv_bytes_or_msg(self._sock, frame_bytes)
if not isinstance(raw, bytes):
raise ValueError(f"Expected frame data but got message: {raw}")
frames[i] = np.frombuffer(raw, dtype=dtype).reshape(ny, nx)
if pbar is not None:
pbar.update(1)
if pbar is not None:
pbar.close()
# Receive completion message
done = recv_msg(self._sock)
if done.get("error"):
logger.warning("Capture completed with error: %s", done["error"])
return frames
def _recv_variable_frames(self, nx: int, ny: int, dtype: np.dtype,
progress: bool) -> np.ndarray:
"""Receive frames with length prefix until done message."""
import json
frame_list = []
pbar = None
if progress:
pbar = _make_pbar(desc="Capturing", unit="frame")
while True:
data = recv_bytes_or_msg(self._sock)
# Check for done message (JSON)
if isinstance(data, dict):
if data.get("done"):
break
continue
frame = np.frombuffer(data, dtype=dtype).reshape(ny, nx).copy()
frame_list.append(frame)
if pbar is not None:
pbar.update(1)
if pbar is not None:
pbar.close()
if frame_list:
return np.stack(frame_list)
return np.empty((0, ny, nx), dtype=dtype)
def start_capture(self) -> CaptureStream:
"""
Start an open-ended streaming capture.
Returns a CaptureStream handle. Call handle.stop() to end the
capture and get the frames as a numpy array.
Example::
stream = client.start_capture()
time.sleep(5)
frames = stream.stop()
"""
send_msg(self._sock, {"cmd": "start_capture"})
self._sock.settimeout(None)
header = recv_msg(self._sock)
if not header.get("ok"):
self._sock.settimeout(self._timeout)
raise RuntimeError(header.get("error", "Start capture failed"))
return CaptureStream(self._sock, header)
# --- Peripherals ---
def insert_detector(self) -> None:
"""Insert the detector (move bellow stage)."""
self._action("insert_detector")
def retract_detector(self) -> None:
"""Retract the detector (move bellow stage)."""
self._action("retract_detector")
def power_on(self) -> None:
"""Turn on the detector power supply."""
self._action("power_on")
def power_off(self) -> None:
"""Turn off the detector power supply."""
self._action("power_off")
def tec_on(self) -> None:
"""Turn on the TEC power supply."""
self._action("tec_on")
def tec_off(self) -> None:
"""Turn off the TEC power supply."""
self._action("tec_off")
@property
def power_status(self) -> dict[int, dict[str, float]]:
"""Detector power supply channel readings: {ch: {voltage, current}}."""
send_msg(self._sock, {"cmd": "power_status"})
resp = recv_msg(self._sock)
if not resp.get("ok"):
raise RuntimeError(resp.get("error", "Failed"))
return {int(k): v for k, v in resp["channels"].items()}
@property
def tec_status(self) -> dict[int, dict[str, float]]:
"""TEC power supply channel readings: {ch: {voltage, current}}."""
send_msg(self._sock, {"cmd": "tec_status"})
resp = recv_msg(self._sock)
if not resp.get("ok"):
raise RuntimeError(resp.get("error", "Failed"))
return {int(k): v for k, v in resp["channels"].items()}
# --- Register access ---
def read_register(self, address: int) -> int:
"""Read a detector register directly."""
send_msg(self._sock, {"cmd": "register_read", "address": address})
resp = recv_msg(self._sock)
if not resp.get("ok"):
raise RuntimeError(resp.get("error", "Register read failed"))
return resp["value"]
def write_register(self, address: int, value: int) -> None:
"""Write a detector register directly."""
send_msg(self._sock, {
"cmd": "register_write", "address": address, "value": value,
})
resp = recv_msg(self._sock)
if not resp.get("ok"):
raise RuntimeError(resp.get("error", "Register write failed"))
# --- Status / reconnect ---
def status(self) -> dict:
"""Get full server status as a dict."""
send_msg(self._sock, {"cmd": "status"})
return recv_msg(self._sock)
def reconnect(self) -> None:
"""Tell the server to reconnect to the detector (after power cycle)."""
send_msg(self._sock, {"cmd": "reconnect"})
resp = recv_msg(self._sock)
if not resp.get("ok"):
raise RuntimeError(resp.get("error", "Reconnect failed"))
# --- Internal ---
def _get(self, key: str) -> Any:
send_msg(self._sock, {"cmd": "get", "key": key})
resp = recv_msg(self._sock)
if not resp.get("ok"):
raise RuntimeError(resp.get("error", f"Failed to get {key}"))
return resp["value"]
def _set(self, key: str, value: Any) -> None:
send_msg(self._sock, {"cmd": "set", "key": key, "value": value})
resp = recv_msg(self._sock)
if not resp.get("ok"):
raise RuntimeError(resp.get("error", f"Failed to set {key}"))
def _action(self, cmd: str) -> None:
send_msg(self._sock, {"cmd": cmd})
resp = recv_msg(self._sock)
if not resp.get("ok"):
raise RuntimeError(resp.get("error", f"Action {cmd} failed"))
def close(self) -> None:
"""Close the connection."""
try:
self._sock.close()
except Exception:
pass
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def __repr__(self) -> str:
return f"Client({self._host!r}, port={self._port})"
def _make_pbar(total=None, desc="", unit="it"):
"""Create a tqdm progress bar if available, else None."""
try:
from tqdm.auto import tqdm
return tqdm(total=total, desc=desc, unit=unit)
except ImportError:
return None

177
src/team1k/config.py Normal file
View File

@@ -0,0 +1,177 @@
"""
YAML configuration loader.
Replaces the old parameter file parser with a structured YAML config
for all detector, server, and peripheral settings.
"""
from __future__ import annotations
from collections import OrderedDict
from enum import IntEnum
from pydantic import Field, BaseModel, PlainSerializer, field_serializer
from pydantic_yaml import parse_yaml_raw_as, to_yaml_file, to_yaml_str
import logging
from pathlib import Path
from typing import Any, Annotated
import os
logger = logging.getLogger(__name__)
class ExposureMode(IntEnum):
ROLLING_SHUTTER = 0
ROLLING_SHUTTER_WITH_PAUSE = 1
GLOBAL_SHUTTER = 2
GLOBAL_SHUTTER_CDS = 3
class TriggerMode(IntEnum):
INTERNAL = 0
EXTERNAL = 1
# Support str and int values for convenience
@classmethod
def _missing_(cls, value: Any) -> TriggerMode:
if isinstance(value, str):
return cls[value.upper()]
raise ValueError(f"Invalid TriggerMode value: {value}")
class TriggerPolarity(IntEnum):
RISING_EDGE = 0
FALLING_EDGE = 1
# Support str values for convenience
@classmethod
def _missing_(cls, value: Any) -> TriggerPolarity:
if isinstance(value, str):
return cls[value.upper()]
raise ValueError(f"Invalid TriggerPolarity value: {value}")
class BellowStageConfig(BaseModel):
"""Configuration for the bellow stage controller."""
port: str = "/dev/CameraBellowStage"
inserted_position_um: int = 0
retracted_position_um: int = -2_000_000
absolute_positioning: bool = False
baudrate: int = 9600
speed: int = 100_000
class PowerSupplyChannelConfig(BaseModel):
voltage: float = 0.0
current: float = 0.0
ovp: float = 0.0
channel: Annotated[int, Field(ge=1, le=4)] = 1
def ser_number(value: Any) -> int:
return value
class PowerSupplyConfig(BaseModel):
"""Configuration for a programmable power supply."""
port: str = ""
voltage_step: float = 0.2
#channels: list[PowerSupplyChannelConfig] = Field(default=[PowerSupplyChannelConfig(channel=i) for i in range(1, 5)])
channels: dict[Annotated[int, PlainSerializer(ser_number, return_type=int)], PowerSupplyChannelConfig] = Field(default=dict([(i, PowerSupplyChannelConfig(channel=i)) for i in range(1, 5)]))
# @field_serializer('channels')
# def serialize_channels(self, channels: list[PowerSupplyChannelConfig], _info) -> list[dict]:
# # Filter out channels with zero voltage and current for cleaner YAML output
# filtered_items = [item.model_dump() for item in channels if any([item.voltage, item.current, item.ovp])]
# return filtered_items
class DetectorConfig(BaseModel):
"""Configuration for the detector connection."""
ip: str = "10.0.0.32"
register_port: int = 42000
data_port: int = 41000
auto_reconnect: bool = True
reconnect_interval: float = 5.0
class ADCConfig(BaseModel):
"""Configuration for ADC settings."""
clock_frequency_mhz: Annotated[float, Field(ge=50, le=100)] = 50.0
"""ADC clock frequency (50-100 MHz)."""
data_delay: int = 0x180
order_7to0: int = 0x89ABCDEF
order_15to8: int = 0x01234567
class DigitalSignalConfig(BaseModel):
"""Configuration for digital signal settings."""
order_7to0: int = 0x24FFFFFF
order_15to8: int = 0xF5F310FF
polarity: int = 0x0014
class DefaultsConfig(BaseModel):
"""Default values for various parameters."""
exposure_mode: ExposureMode = ExposureMode.GLOBAL_SHUTTER_CDS
trigger_mode: TriggerMode = TriggerMode.EXTERNAL
trigger_polarity: TriggerPolarity = TriggerPolarity.RISING_EDGE
integration_time_ms: float = 4.0
class ServerConfig(BaseModel):
"""Configuration for the server."""
pv_prefix: str = "TEAM1K:"
client_port: int = 42010
class PeripheralConfig(BaseModel):
"""Configuration for peripheral devices."""
bellow_stage: BellowStageConfig = Field(default_factory=BellowStageConfig)
detector_power: PowerSupplyConfig = Field(default_factory=PowerSupplyConfig)
tec: PowerSupplyConfig = Field(default_factory=PowerSupplyConfig)
class Team1kConfig(BaseModel):
"""Complete server configuration loaded from YAML."""
config_version: int = 1
detector: DetectorConfig = Field(default_factory=DetectorConfig)
"""Detector connection settings."""
adc: ADCConfig = Field(default_factory=ADCConfig)
"""ADC configuration parameters."""
digital: DigitalSignalConfig = Field(default_factory=DigitalSignalConfig)
"""Digital signal configuration parameters."""
defaults: DefaultsConfig = Field(default_factory=DefaultsConfig)
"""Default DAQ parameters."""
server: ServerConfig = Field(default_factory=ServerConfig)
"""Server configuration."""
peripherals: PeripheralConfig = Field(default_factory=PeripheralConfig)
"""Peripheral device configurations, including power supplies and bellow stage."""
def load_config(path: str | Path | None = None) -> Team1kConfig:
"""
Load configuration from a YAML file.
Missing keys fall back to defaults. If no path is given,
returns a config with all defaults.
Args:
path: Path to YAML config file, or None for defaults.
Returns:
Populated Team1kConfig.
"""
if path is not None and not Path(path).is_file():
logger.info("Config file not found: %s, creating it using defaults", path)
config = Team1kConfig()
print(config)
print(config.model_dump(mode='python'))
to_yaml_file(path, config)#, add_comments=True)
if path is None:
path = Path(os.getenv("TEAM1K_CONFIG", os.path.join(os.path.expanduser("~"), ".team1k_config.yaml")))
logger.info("No config file specified, attempting %s", path)
path = Path(path)
if not path.is_file():
logger.warning("Config file not found: %s, using defaults", path)
return Team1kConfig()
with open(path) as f:
config = parse_yaml_raw_as(Team1kConfig, f.read())
logger.info("Config loaded: detector=%s:%d, prefix=%s",
config.detector_ip, config.register_port, config.pv_prefix)
logger.debug("Full config:\n%s", to_yaml_str(config))
return config

View File

@@ -4,4 +4,3 @@ from .data_port import DataPort
from .chip_config import ChipConfig, TEAM1K_CONFIG, configure_chip
from .commands import DetectorCommands
from .adc import ADCController
from .parameter_file import apply_parameter_file

View File

@@ -1,179 +0,0 @@
"""
Parameter file parser.
Ports KTestFunctions::ReadParameterFile from KTestFunctions.cxx (lines 1078-1195).
"""
import logging
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .registers import RegisterInterface
from .commands import DetectorCommands
from .adc import ADCController
logger = logging.getLogger(__name__)
def _parse_int(s: str) -> int:
"""Parse an integer from string, supporting both decimal and hex (0x prefix)."""
s = s.strip()
if 'x' in s or 'X' in s:
return int(s, 16)
return int(s)
def apply_parameter_file(
filepath: str,
commands: 'DetectorCommands',
adc: 'ADCController',
registers: 'RegisterInterface',
restart_daq_callback=None,
) -> tuple[bool, str]:
"""
Parse and apply all settings from a parameter file.
The file format is one setting per line:
keyword value1 [value2 ...]
Lines starting with '--' are comments.
Supported keywords:
ADC_clock_frequency_in_MHz <freq>
ADC_data_delay <delay>
chip_type <name> (ignored — only team1k is supported)
exposure_mode <mode>
trigger_mode <external> <polarity>
integration_time <ms>
digital_signal_polarity <value>
digital_signal_order_7downto0 <value>
digital_signal_order_15downto8 <value>
ADC_order_7downto0 <value>
ADC_order_15downto8 <value>
restart_daq
Args:
filepath: Path to the parameter file.
commands: DetectorCommands instance.
adc: ADCController instance.
registers: RegisterInterface instance.
restart_daq_callback: Optional callable to restart DAQ (for 'restart_daq' keyword).
Returns:
Tuple of (success, output_message).
"""
path = Path(filepath)
if not path.is_file():
msg = f"Parameter file not found: {filepath}"
logger.error(msg)
return False, msg
output = [f"Reading parameter file: {filepath}"]
logger.info("Reading parameter file: %s", filepath)
with open(path, 'r') as f:
for line_num, raw_line in enumerate(f, start=1):
line = raw_line.strip()
if not line:
continue
if line.startswith('--'):
continue
parts = line.split()
if not parts:
continue
keyword = parts[0]
args = parts[1:]
ok = False
msg = ""
try:
if keyword == "ADC_clock_frequency_in_MHz":
freq = float(args[0])
ok = adc.set_clock_freq(freq)
msg = f"Set ADC clock frequency to {freq} MHz"
elif keyword == "ADC_data_delay":
delay = _parse_int(args[0])
ok = commands.set_adc_data_delay(delay)
msg = f"Set ADC data delay to 0x{delay:x}"
elif keyword == "chip_type":
if args[0] == "team1k":
ok = True
msg = "Chip type: team1k (confirmed)"
else:
msg = f"Unsupported chip type: {args[0]} (only team1k supported)"
ok = False
elif keyword == "exposure_mode":
mode = _parse_int(args[0])
ok = commands.set_exposure_mode(mode)
msg = f"Set exposure mode to {mode}"
elif keyword == "trigger_mode":
external = bool(_parse_int(args[0]))
polarity = bool(_parse_int(args[1])) if len(args) > 1 else False
ok = commands.set_trigger_mode(external, polarity)
msg = f"Set trigger mode: external={external}, polarity={polarity}"
elif keyword == "integration_time":
time_ms = float(args[0])
ok = commands.set_integration_time(time_ms)
msg = f"Set integration time to {time_ms} ms"
elif keyword == "digital_signal_polarity":
val = _parse_int(args[0])
ok = registers.write_register(27, val)
msg = f"Set digital signal polarity to {args[0]}"
elif keyword == "digital_signal_order_7downto0":
val = _parse_int(args[0])
ok = registers.write_register(28, val)
msg = f"Set digital signals 7:0 order to {args[0]}"
elif keyword == "digital_signal_order_15downto8":
val = _parse_int(args[0])
ok = registers.write_register(29, val)
msg = f"Set digital signals 15:8 order to {args[0]}"
elif keyword == "ADC_order_7downto0":
val = _parse_int(args[0])
ok = registers.write_register(30, val)
msg = f"Set ADC 7:0 order to {args[0]}"
elif keyword == "ADC_order_15downto8":
val = _parse_int(args[0])
ok = registers.write_register(31, val)
msg = f"Set ADC 15:8 order to {args[0]}"
elif keyword == "restart_daq":
if restart_daq_callback:
ok = restart_daq_callback()
else:
ok = True
msg = "Restarted DAQ"
else:
msg = f"Unknown keyword: {keyword}"
ok = False
except (IndexError, ValueError) as e:
msg = f"Error parsing line {line_num}: {line} ({e})"
ok = False
if ok:
output.append(f" {msg}")
else:
error_msg = f" Line {line_num}: {msg or f'Error: {line}'}"
output.append(error_msg)
logger.error("Parameter file error at line %d: %s", line_num, msg or line)
result = "\n".join(output)
return False, result
output.append(f"Done reading parameter file: {filepath}")
result = "\n".join(output)
logger.info("Parameter file applied successfully")
return True, result

View File

@@ -1 +0,0 @@
from .capture import FrameCapture, DataTransferServer

View File

@@ -1,264 +0,0 @@
"""
On-demand frame capture and ZMQ data transfer.
Instead of continuous local file writing, this module captures a requested
number of frames from the shared memory ring buffer into a temp file, then
serves them to remote clients via ZMQ. This is designed for the common case
where the server runs on a dedicated detector machine and clients need to
retrieve data over a (slower) network.
Flow:
1. Client triggers capture via PVA: pvput TEAM1K:CMD:CAPTURE <num_frames>
2. FrameCapture thread reads N frames from shmring → temp file
3. Client polls TEAM1K:CAPTURE:STATUS via PVA until "READY"
4. Client fetches data via ZMQ: sends b"FETCH", receives [header, data]
"""
import json
import logging
import tempfile
import threading
from pathlib import Path
import numpy as np
import zmq
from ..detector.chip_config import ChipConfig, TEAM1K_CONFIG
logger = logging.getLogger(__name__)
DEFAULT_ZMQ_PORT = 42005
class FrameCapture(threading.Thread):
"""
Captures frames from shmring on demand.
When triggered via capture(), reads N consecutive frames from the ring
buffer and writes them to a temp file. Status is tracked and exposed
for the PVA interface.
"""
def __init__(self, ring_name: str, chip_config: ChipConfig = TEAM1K_CONFIG):
super().__init__(daemon=True, name="team1k-capture")
self._ring_name = ring_name
self._config = chip_config
self._shutdown = threading.Event()
# Capture state
self._capture_event = threading.Event()
self._capture_done = threading.Event()
self._num_requested = 0
self._num_captured = 0
self._status = "IDLE"
self._error_msg = ""
self._temp_path: str | None = None
self._data_lock = threading.Lock()
@property
def status(self) -> str:
return self._status
@property
def num_captured(self) -> int:
return self._num_captured
@property
def num_requested(self) -> int:
return self._num_requested
@property
def error_message(self) -> str:
return self._error_msg
def capture(self, num_frames: int) -> None:
"""Request capture of N frames from the ring buffer."""
self._num_requested = num_frames
self._num_captured = 0
self._status = "CAPTURING"
self._error_msg = ""
self._capture_done.clear()
self._capture_event.set()
logger.info("Capture requested: %d frames", num_frames)
def wait_complete(self, timeout: float | None = None) -> bool:
"""Wait for current capture to complete. Returns True if complete."""
return self._capture_done.wait(timeout)
def get_data(self) -> tuple[dict, bytes] | None:
"""
Read captured data. Returns (header_dict, raw_bytes) or None.
The caller should hold this briefly — a new capture will block
until this returns.
"""
with self._data_lock:
if self._status != "READY" or not self._temp_path:
return None
header = {
"num_frames": self._num_captured,
"nx": self._config.image_nx,
"ny": self._config.image_ny,
"dtype": "uint16",
}
try:
data = Path(self._temp_path).read_bytes()
except Exception as e:
logger.error("Failed to read capture file: %s", e)
return None
return header, data
def run(self) -> None:
"""Main thread loop: wait for capture requests."""
from locking_shmring import RingBufferReader
reader = RingBufferReader(self._ring_name)
logger.info("Frame capture connected to ring buffer '%s'", self._ring_name)
try:
while not self._shutdown.is_set():
if not self._capture_event.wait(0.2):
continue
self._capture_event.clear()
self._do_capture(reader)
except Exception as e:
logger.error("Frame capture thread error: %s", e)
finally:
reader.close()
def _do_capture(self, reader) -> None:
"""Capture frames to a temp file."""
# Clean up previous temp file (under lock so concurrent reads finish)
with self._data_lock:
if self._temp_path:
try:
Path(self._temp_path).unlink(missing_ok=True)
except Exception:
pass
self._temp_path = None
try:
fd, temp_path = tempfile.mkstemp(suffix='.raw', prefix='team1k_capture_')
with open(fd, 'wb') as f:
for slot in reader.iter_frames(timeout_ms=5000):
if self._shutdown.is_set():
self._status = "ERROR"
self._error_msg = "Shutdown during capture"
self._capture_done.set()
return
with slot:
frame = slot.data_as_numpy(
(self._config.image_ny, self._config.image_nx),
np.uint16,
)
f.write(frame.tobytes())
self._num_captured += 1
if self._num_captured >= self._num_requested:
break
if self._num_captured < self._num_requested:
self._status = "ERROR"
self._error_msg = (
f"Only captured {self._num_captured}/{self._num_requested} "
f"frames (ring timeout)"
)
logger.error(self._error_msg)
else:
with self._data_lock:
self._temp_path = temp_path
self._status = "READY"
logger.info("Capture complete: %d frames (%s)",
self._num_captured, temp_path)
except Exception as e:
self._status = "ERROR"
self._error_msg = str(e)
logger.error("Capture error: %s", e)
self._capture_done.set()
def stop(self) -> None:
"""Stop the capture thread and clean up."""
self._shutdown.set()
if self._temp_path:
try:
Path(self._temp_path).unlink(missing_ok=True)
except Exception:
pass
class DataTransferServer(threading.Thread):
"""
ZMQ server for transferring captured frame data to clients.
Runs a REP socket. Supports two request types:
b"FETCH" — returns [header_json, frame_data_bytes]
b"STATUS" — returns JSON with capture status
"""
def __init__(self, capture: FrameCapture, chip_config: ChipConfig = TEAM1K_CONFIG,
port: int = DEFAULT_ZMQ_PORT):
super().__init__(daemon=True, name="team1k-data-transfer")
self._capture = capture
self._config = chip_config
self._port = port
self._shutdown = threading.Event()
def run(self) -> None:
ctx = zmq.Context()
sock = ctx.socket(zmq.REP)
sock.bind(f"tcp://*:{self._port}")
sock.setsockopt(zmq.RCVTIMEO, 200)
logger.info("Data transfer server listening on tcp://*:%d", self._port)
try:
while not self._shutdown.is_set():
try:
msg = sock.recv()
except zmq.Again:
continue
self._handle(sock, msg)
finally:
sock.close()
ctx.term()
def _handle(self, sock: zmq.Socket, msg: bytes) -> None:
try:
request = msg.decode('utf-8').strip()
except UnicodeDecodeError:
sock.send(json.dumps({"error": "Invalid request"}).encode())
return
if request == "FETCH":
self._handle_fetch(sock)
elif request == "STATUS":
sock.send(json.dumps({
"status": self._capture.status,
"num_captured": self._capture.num_captured,
"num_requested": self._capture.num_requested,
"error": self._capture.error_message,
}).encode())
else:
sock.send(json.dumps({"error": f"Unknown request: {request}"}).encode())
def _handle_fetch(self, sock: zmq.Socket) -> None:
result = self._capture.get_data()
if result is None:
sock.send_multipart([
json.dumps({
"error": "No data available",
"status": self._capture.status,
}).encode(),
b"",
])
return
header, data = result
sock.send_multipart([json.dumps(header).encode(), data])
logger.info("Sent %d frames (%d bytes) to client",
header["num_frames"], len(data))
def stop(self) -> None:
self._shutdown.set()

View File

@@ -1,2 +1,4 @@
from .bellow_stage import BellowStage
from .power_supply import PowerSupply
from .power_supply import DetectorPowerSupply
from .tec import TECController
from .power_base import PowerSupplyBase

View File

@@ -9,18 +9,15 @@ import logging
import serial
from ..config import BellowStageConfig
logger = logging.getLogger(__name__)
DEFAULT_PORT = '/dev/CameraBellowStage'
DEFAULT_BAUDRATE = 9600
class BellowStage:
"""Controls the camera bellow stage motor via serial port."""
def __init__(self, port: str = DEFAULT_PORT, baudrate: int = DEFAULT_BAUDRATE):
self._port = port
self._baudrate = baudrate
def __init__(self, config: BellowStageConfig):
self._config = config
self.position: int = 0
self.is_moving: bool = False
@@ -38,42 +35,40 @@ class BellowStage:
logger.info("%s detector", action)
self.is_moving = True
if insert:
target_position = self._config.inserted_position_um
else:
target_position = self._config.retracted_position_um
try:
with serial.Serial(self._port, self._baudrate, timeout=1) as ser:
time.sleep(0.5)
ser.write(b'\r')
time.sleep(0.5)
ser.write(b'\x03') # Ctrl-C to stop any ongoing movement
time.sleep(0.5)
with serial.Serial(self._config.port, self._config.baudrate, timeout=1) as ser:
ser.write(b'\r') # Clear any previous command
ser.write(b'\x03') # Ctrl-C to stop any ongoing movement, restart the stage
ser.write(b'rc=100\r')
time.sleep(0.5)
if insert:
ser.write(b'vm=100000\r')
time.sleep(0.5)
ser.write(b'mr 2000000\r')
else:
ser.write(b'vm=100001\r')
time.sleep(0.5)
ser.write(b'mr -2000000\r')
# Wait for movement
time.sleep(10)
# Read position
ser.write(b'pr p\r')
time.sleep(0.5)
response_bytes = ser.read_all()
response = response_bytes.decode('utf-8', errors='ignore') if response_bytes else ""
for line in response.splitlines():
if line.strip().startswith('p='):
try:
self.position = int(line.strip()[2:])
except ValueError:
pass
for _ in range(3):
ser.write(b'EM=3\r') # Set ECHO mode to only respond after command completion
if ser.readline().strip() == b'EM=3':
break
else:
logger.error("Failed to set ECHO mode on bellows stage.")
return False, "Failed to set ECHO mode on bellows stage."
ser.write(f'RC=100\r'.encode()) # Set the run current
ser.readline() # Wait for command finish
ser.write(f'VM={self._config.speed}\r'.encode()) # Set max velocity
ser.readline() # Wait for command finish
self.is_moving = True
if self._config.absolute_positioning:
ser.write(f'MA={target_position}\r'.encode()) # Move to absolute position
else:
ser.write(f'MR={target_position}\r'.encode()) # Move relative to current position
ser.readline() # Wait for command finish
ser.write(b'pr p\r') # Print current position
ser.readline() # Read echo
self.position = int(ser.readline()) # Read position
self.is_moving = False
logger.info("Detector %s successfully (position=%d)",

View File

@@ -0,0 +1,149 @@
"""
Base class for VISA-controlled programmable power supplies.
Used by both the detector power supply and TEC controller.
Based on Rohde & Schwarz HMP series SCPI commands.
"""
import logging
import threading
import dataclasses
import pyvisa
logger = logging.getLogger(__name__)
class PowerSupplyBase:
"""
Programmable power supply controller via VISA (serial).
Args:
port: Device port (e.g., "/dev/DetectorPowerSupply" or "COM3").
settings: Channel settings: {ch_num: (voltage, current, ovp)}.
voltage_step: Voltage step size for ramping (default 0.2V).
name: Human-readable name for logging.
"""
@dataclasses.dataclass
class ChannelStatus:
voltage: float
current: float
def __init__(self, port: str,
settings: dict[int, tuple[float, float, float]],
voltage_step: float = 0.2,
name: str = "Power Supply"):
self._port = f"ASRL{port}::INSTR"
self._settings = settings
self._voltage_step = voltage_step
self.name = name
self._is_on = False
self._lock = threading.Lock()
def _open_resource(self) -> pyvisa.Resource:
"""Open the VISA resource."""
rm = pyvisa.ResourceManager("@py")
return rm.open_resource(self._port)
def initialize(self) -> None:
"""
Configure all channels with their voltage, current, and OVP settings.
All outputs are disabled during configuration.
"""
with self._lock:
try:
inst = self._open_resource()
inst.write("OUTP:GEN 0 \n")
for ch, (voltage, current, ovp) in self._settings.items():
inst.write(f"INST:NSEL {ch} \n")
inst.write(f"OUTP:SEL 0 \n")
inst.write(f"APPL {voltage}, {current} \n")
inst.write(f"SOUR:VOLT:PROT {ovp} \n")
inst.write(f"SOUR:VOLT:STEP {self._voltage_step} \n")
volt = float(inst.query("SOUR:VOLT? \n"))
curr = float(inst.query("SOUR:CURR? \n"))
vpro = float(inst.query("SOUR:VOLT:PROT? \n"))
logger.debug("%s Ch%d: %.4fV, %.4fA, %.4fV-OVP",
self.name, ch, volt, curr, vpro)
inst.close()
logger.info("%s initialized", self.name)
except Exception as e:
logger.error("%s initialization failed: %s", self.name, e)
raise
def power_on(self) -> None:
"""Turn on all configured channels."""
with self._lock:
try:
if not self._is_on:
self.initialize()
inst = self._open_resource()
for ch, (voltage, _, _) in self._settings.items():
output_on = voltage > 0
inst.write(f"INST:NSEL {ch} \n")
inst.write(f"OUTP:SEL {1 if output_on else 0} \n")
inst.write("OUTP:GEN 1 \n")
inst.close()
self._is_on = True
logger.info("%s turned ON", self.name)
except Exception as e:
logger.error("%s power on failed: %s", self.name, e)
raise
def power_off(self) -> None:
"""Turn off all channels."""
with self._lock:
try:
inst = self._open_resource()
inst.write("OUTP:GEN 0 \n")
for ch in self._settings:
inst.write(f"INST:NSEL {ch} \n")
inst.write(f"OUTP:SEL 0 \n")
inst.close()
self._is_on = False
logger.info("%s turned OFF", self.name)
except Exception as e:
logger.error("%s power off failed: %s", self.name, e)
raise
def get_status(self) -> dict[int, ChannelStatus]:
"""
Read measured voltage and current for each active channel.
Returns:
{channel: {"voltage": float, "current": float}}
"""
with self._lock:
status = {}
try:
inst = self._open_resource()
for ch, (voltage, _, _) in self._settings.items():
if voltage == 0:
continue
inst.write(f"INST:NSEL {ch} \n")
volt = float(inst.query("MEAS:VOLT? \n"))
curr = float(inst.query("MEAS:CURR? \n"))
status[ch] = self.ChannelStatus(voltage=volt, current=curr)
inst.close()
except Exception as e:
logger.debug("%s status read failed: %s", self.name, e)
return status
@property
def is_on(self) -> bool:
return self._is_on

View File

@@ -1,37 +1,59 @@
"""
Power supply control.
Detector power supply controller.
Extracted from the existing server.py.
Wraps PowerSupplyBase with the detector-specific VISA port and
channel settings from the YAML config.
"""
import logging
from .power_base import PowerSupplyBase
from ..config import PowerSupplyConfig
logger = logging.getLogger(__name__)
class PowerSupply:
"""Controls the detector power supply."""
class DetectorPowerSupply(PowerSupplyBase):
"""
Detector power supply.
def __init__(self):
self.is_on: bool = False
self.voltages: list[float] = []
Instantiated from the config's ``detector_power`` section.
If no port is configured, all operations are no-ops.
"""
def turn_on(self) -> tuple[bool, str | None]:
"""Turn on the power supply."""
# TODO: Implement actual power supply control
# The original server.py had placeholder serial port communication
self.is_on = True
logger.info("Power supply turned on")
return True, None
def __init__(self, config: PowerSupplyConfig):
self._enabled = bool(config.port)
if self._enabled:
super().__init__(
port=config.port,
settings=config.settings,
voltage_step=config.voltage_step,
name="Detector Power Supply",
)
else:
self._is_on = False
self._lock = None
logger.info("Detector power supply not configured (no port)")
def turn_off(self) -> tuple[bool, str | None]:
"""Turn off the power supply."""
self.is_on = False
logger.info("Power supply turned off")
return True, None
def initialize(self) -> None:
if self._enabled:
super().initialize()
def read_voltages(self) -> tuple[list[float] | None, str | None]:
"""Read voltage values from the power supply."""
# TODO: Implement actual voltage reading
logger.info("Reading power supply voltages")
return self.voltages, None
def power_on(self) -> None:
if self._enabled:
super().power_on()
else:
self._is_on = True
logger.info("Detector power supply: power_on (no-op, not configured)")
def power_off(self) -> None:
if self._enabled:
super().power_off()
else:
self._is_on = False
logger.info("Detector power supply: power_off (no-op, not configured)")
def get_status(self) -> dict[int, PowerSupplyBase.ChannelStatus]:
if self._enabled:
return super().get_status()
return {}

View File

@@ -0,0 +1,59 @@
"""
TEC (thermoelectric cooler) power supply controller.
Wraps PowerSupplyBase with the TEC-specific VISA port and
channel settings from the YAML config.
"""
import logging
from .power_base import PowerSupplyBase
from ..config import PowerSupplyConfig
logger = logging.getLogger(__name__)
class TECController(PowerSupplyBase):
"""
TEC power supply.
Instantiated from the config's ``tec`` section.
If no port is configured, all operations are no-ops.
"""
def __init__(self, config: PowerSupplyConfig):
self._enabled = bool(config.port)
if self._enabled:
super().__init__(
port=config.port,
settings=config.settings,
voltage_step=config.voltage_step,
name="TEC Power Supply",
)
else:
self._is_on = False
self._lock = None
logger.info("TEC power supply not configured (no port)")
def initialize(self) -> None:
if self._enabled:
super().initialize()
def power_on(self) -> None:
if self._enabled:
super().power_on()
else:
self._is_on = True
logger.info("TEC: power_on (no-op, not configured)")
def power_off(self) -> None:
if self._enabled:
super().power_off()
else:
self._is_on = False
logger.info("TEC: power_off (no-op, not configured)")
def get_status(self) -> dict[int, dict[str, float]]:
if self._enabled:
return super().get_status()
return {}

View File

@@ -1,8 +1,9 @@
"""
PVA server setup, command PVs, and status PVs.
PVA server with areaDetector-style PV naming.
Replaces all ZMQ communication (REQ/REP for commands, PUB for status)
with EPICS PV Access channels.
Uses RW PVs with _RBV readbacks following EPICS areaDetector conventions.
NTEnum for exposure mode, trigger mode, and detector state.
NTScalar bool for toggles (Acquire, Power, TEC, Bellow).
"""
import logging
@@ -11,10 +12,14 @@ from typing import TYPE_CHECKING, Any
import numpy as np
from p4p import Value
from p4p.server import Server, ServerOperation
from p4p.server.thread import SharedPV
from p4p.nt import NTScalar, NTNDArray
from p4p.nt import NTScalar, NTNDArray, NTEnum
from ..state import (
DetectorState, DETECTOR_STATE_CHOICES,
EXPOSURE_MODE_CHOICES, TRIGGER_MODE_CHOICES,
)
if TYPE_CHECKING:
from ..server import Team1kServer
@@ -23,76 +28,84 @@ logger = logging.getLogger(__name__)
class _ReadOnlyHandler:
"""Handler for read-only status PVs."""
"""Handler that rejects writes for read-only PVs."""
def put(self, pv: SharedPV, op: ServerOperation):
op.done(error="PV is read-only")
def rpc(self, pv: SharedPV, op: ServerOperation):
op.done(error="RPC not supported")
class _CommandHandler:
class _RWHandler:
"""
Handler for writable command PVs.
Handler for writable PVs.
When a client puts a value, the handler dispatches to the server.
On put, dispatches to server.apply_parameter(), then updates
both the setpoint PV and the corresponding _RBV PV.
"""
def __init__(self, server: 'Team1kServer', command_name: str):
def __init__(self, server: 'Team1kServer', param_name: str,
rbv_pv: SharedPV | None = None):
self._server = server
self._command = command_name
self._param = param_name
self._rbv = rbv_pv
def put(self, pv: SharedPV, op: ServerOperation):
try:
raw = op.value()
# Extract the actual value from the NTScalar wrapper
value = raw.value if hasattr(raw, 'value') else raw
result = self._server.execute_command(self._command, value)
# For NTEnum, extract the index
if hasattr(raw, 'index'):
value = raw.index
self._server.apply_parameter(self._param, value)
pv.post(raw)
if self._rbv is not None:
self._rbv.post(raw)
op.done()
except Exception as e:
logger.error("Command '%s' failed: %s", self._command, e)
logger.error("Parameter '%s' put failed: %s", self._param, e)
op.done(error=str(e))
class _ActionHandler:
"""Handler for action PVs (bellow insert/retract, etc.)."""
def __init__(self, server: 'Team1kServer', action_name: str):
self._server = server
self._action = action_name
def put(self, pv: SharedPV, op: ServerOperation):
try:
raw = op.value()
value = raw.value if hasattr(raw, 'value') else raw
if value:
self._server.execute_action(self._action)
op.done()
except Exception as e:
logger.error("Action '%s' failed: %s", self._action, e)
op.done(error=str(e))
class PVAInterface:
"""
EPICS PV Access server for commands, status, and data streaming.
EPICS PV Access server following areaDetector conventions.
PV layout:
Status (read-only):
{prefix}STATUS - string array of status lines
{prefix}ACQUIRING - bool, DAQ running
{prefix}FRAME_RATE - float, current frame rate Hz
{prefix}FRAME_COUNT - int, total frames acquired
{prefix}EXPOSURE_MODE - int, current exposure mode
{prefix}TRIGGER_MODE - int, current trigger mode
{prefix}INTEGRATION_TIME - float, integration time ms
{prefix}CAPTURE:STATUS - string, capture state (IDLE/CAPTURING/READY/ERROR)
{prefix}CAPTURE:PROGRESS - int, frames captured so far
{prefix}CAPTURE:TOTAL - int, frames requested
Commands (writable):
{prefix}CMD:EXPOSURE_MODE - int, set exposure mode
{prefix}CMD:TRIGGER_MODE - int, set trigger mode
{prefix}CMD:INTEGRATION_TIME - float, set integration time ms
{prefix}CMD:START_DAQ - int, 1=start, 0=stop
{prefix}CMD:CAPTURE - int, capture N frames
{prefix}CMD:ADC_CLOCK_FREQ - float, set ADC clock MHz
{prefix}CMD:ADC_DATA_DELAY - int, set ADC data delay
{prefix}CMD:PARAMETER_FILE - string, apply parameter file
{prefix}CMD:RESET - int, 1=reset detector connection
{prefix}CMD:TEST_MODE - int, 0/1
Data:
{prefix}IMAGE - NTNDArray, live image stream
Peripherals:
{prefix}BELLOW:CMD - int, 1=insert, 0=retract
{prefix}BELLOW:POSITION - int, current position
{prefix}POWER:CMD - int, 1=on, 0=off
{prefix}POWER:VOLTAGES - float array, voltage readings
PV naming uses RW + _RBV pattern:
{prefix}Acquire / Acquire_RBV bool Start/stop DAQ
{prefix}AcquireTime / AcquireTime_RBV float Integration time (s)
{prefix}ExposureMode / ExposureMode_RBV NTEnum Exposure mode
{prefix}TriggerMode / TriggerMode_RBV NTEnum Trigger mode
{prefix}ArrayCounter_RBV int Frame count
{prefix}FrameRate_RBV float Hz
{prefix}DetectorState_RBV NTEnum State
{prefix}MaxSizeX_RBV / MaxSizeY_RBV int 1024
{prefix}IMAGE NTNDArray
{prefix}Bellow:Insert / Bellow:Retract bool Actions
{prefix}Bellow:Position_RBV int
{prefix}Power:Enable / Power:Enable_RBV bool
{prefix}Power:ChN:Voltage_RBV float
{prefix}Power:ChN:Current_RBV float
{prefix}TEC:Enable / TEC:Enable_RBV bool
{prefix}TEC:ChN:Voltage_RBV float
{prefix}TEC:ChN:Current_RBV float
"""
def __init__(self, server: 'Team1kServer', prefix: str = "TEAM1K:"):
@@ -100,109 +113,143 @@ class PVAInterface:
self._prefix = prefix
self._pvs: dict[str, SharedPV] = {}
self._pva_server: Server | None = None
self._ro_handler = _ReadOnlyHandler()
self._ro = _ReadOnlyHandler()
@property
def image_pv(self) -> SharedPV:
return self._pvs[f"{self._prefix}IMAGE"]
def _make_status_pv(self, nt, initial) -> SharedPV:
"""Create a read-only status PV."""
pv = SharedPV(handler=self._ro_handler, nt=nt, initial=initial)
def _pv_name(self, suffix: str) -> str:
return f"{self._prefix}{suffix}"
def _add_ro(self, suffix: str, nt, initial) -> SharedPV:
"""Create and register a read-only PV."""
pv = SharedPV(handler=self._ro, nt=nt, initial=initial)
self._pvs[self._pv_name(suffix)] = pv
return pv
def _make_command_pv(self, nt, initial, command_name: str) -> SharedPV:
"""Create a writable command PV."""
handler = _CommandHandler(self._server, command_name)
def _add_rw(self, suffix: str, nt, initial,
param_name: str, rbv: SharedPV | None = None) -> SharedPV:
"""Create and register an RW PV with a parameter handler."""
handler = _RWHandler(self._server, param_name, rbv)
pv = SharedPV(handler=handler, nt=nt, initial=initial)
self._pvs[self._pv_name(suffix)] = pv
return pv
def _add_action(self, suffix: str, action_name: str) -> SharedPV:
"""Create and register an action PV (bool, triggers on True)."""
handler = _ActionHandler(self._server, action_name)
pv = SharedPV(handler=handler, nt=NTScalar('?'), initial=False)
self._pvs[self._pv_name(suffix)] = pv
return pv
def setup(self) -> None:
"""Create all PVs and start the PVA server."""
p = self._prefix
chip = self._server.chip_config
# --- Status PVs (read-only) ---
self._pvs[f"{p}STATUS"] = self._make_status_pv(
NTScalar('as'), ['INITIALIZING'])
self._pvs[f"{p}ACQUIRING"] = self._make_status_pv(
NTScalar('?'), False)
self._pvs[f"{p}FRAME_RATE"] = self._make_status_pv(
NTScalar('d'), 0.0)
self._pvs[f"{p}FRAME_COUNT"] = self._make_status_pv(
NTScalar('l'), 0)
self._pvs[f"{p}EXPOSURE_MODE"] = self._make_status_pv(
NTScalar('i'), 0)
self._pvs[f"{p}TRIGGER_MODE"] = self._make_status_pv(
NTScalar('i'), 0)
self._pvs[f"{p}INTEGRATION_TIME"] = self._make_status_pv(
NTScalar('d'), 0.0)
self._pvs[f"{p}CAPTURE:STATUS"] = self._make_status_pv(
NTScalar('s'), "IDLE")
self._pvs[f"{p}CAPTURE:PROGRESS"] = self._make_status_pv(
NTScalar('i'), 0)
self._pvs[f"{p}CAPTURE:TOTAL"] = self._make_status_pv(
NTScalar('i'), 0)
# --- Acquisition control ---
# --- Command PVs (writable) ---
self._pvs[f"{p}CMD:EXPOSURE_MODE"] = self._make_command_pv(
NTScalar('i'), 0, "set_exposure_mode")
self._pvs[f"{p}CMD:TRIGGER_MODE"] = self._make_command_pv(
NTScalar('i'), 0, "set_trigger_mode")
self._pvs[f"{p}CMD:INTEGRATION_TIME"] = self._make_command_pv(
NTScalar('d'), 0.0, "set_integration_time")
self._pvs[f"{p}CMD:START_DAQ"] = self._make_command_pv(
NTScalar('i'), 0, "start_stop_daq")
self._pvs[f"{p}CMD:CAPTURE"] = self._make_command_pv(
NTScalar('i'), 0, "capture_frames")
self._pvs[f"{p}CMD:ADC_CLOCK_FREQ"] = self._make_command_pv(
NTScalar('d'), 60.0, "set_adc_clock_freq")
self._pvs[f"{p}CMD:ADC_DATA_DELAY"] = self._make_command_pv(
NTScalar('i'), 0, "set_adc_data_delay")
self._pvs[f"{p}CMD:PARAMETER_FILE"] = self._make_command_pv(
NTScalar('s'), "", "apply_parameter_file")
self._pvs[f"{p}CMD:RESET"] = self._make_command_pv(
NTScalar('i'), 0, "reset_connection")
self._pvs[f"{p}CMD:TEST_MODE"] = self._make_command_pv(
NTScalar('i'), 0, "set_test_mode")
# Acquire (bool)
acquire_rbv = self._add_ro("Acquire_RBV", NTScalar('?'), False)
self._add_rw("Acquire", NTScalar('?'), False, "acquire", acquire_rbv)
# --- Data PV ---
self._pvs[f"{p}IMAGE"] = SharedPV(
handler=self._ro_handler,
# AcquireTime (float, seconds — converted to ms internally)
acq_time_rbv = self._add_ro("AcquireTime_RBV", NTScalar('d'), 0.0)
self._add_rw("AcquireTime", NTScalar('d'), 0.0,
"acquire_time", acq_time_rbv)
# ExposureMode (NTEnum)
em_initial = {"choices": EXPOSURE_MODE_CHOICES, "index": 0}
em_rbv = self._add_ro("ExposureMode_RBV", NTEnum(), em_initial)
self._add_rw("ExposureMode", NTEnum(), em_initial,
"exposure_mode", em_rbv)
# TriggerMode (NTEnum)
tm_initial = {"choices": TRIGGER_MODE_CHOICES, "index": 0}
tm_rbv = self._add_ro("TriggerMode_RBV", NTEnum(), tm_initial)
self._add_rw("TriggerMode", NTEnum(), tm_initial,
"trigger_mode", tm_rbv)
# --- Status (read-only) ---
self._add_ro("ArrayCounter_RBV", NTScalar('l'), 0)
self._add_ro("FrameRate_RBV", NTScalar('d'), 0.0)
# DetectorState (NTEnum, read-only)
state_initial = {
"choices": DETECTOR_STATE_CHOICES,
"index": DetectorState.DISCONNECTED,
}
self._add_ro("DetectorState_RBV", NTEnum(), state_initial)
# Image dimensions
self._add_ro("MaxSizeX_RBV", NTScalar('i'), chip.image_nx)
self._add_ro("MaxSizeY_RBV", NTScalar('i'), chip.image_ny)
# --- IMAGE (NTNDArray) ---
self._pvs[self._pv_name("IMAGE")] = SharedPV(
handler=self._ro,
nt=NTNDArray(),
initial=np.zeros((self._server.chip_config.image_ny,
self._server.chip_config.image_nx), dtype=np.uint16),
initial=np.zeros((chip.image_ny, chip.image_nx), dtype=np.uint16),
)
# --- Peripheral PVs ---
self._pvs[f"{p}BELLOW:CMD"] = self._make_command_pv(
NTScalar('i'), 0, "bellow_move")
self._pvs[f"{p}BELLOW:POSITION"] = self._make_status_pv(
NTScalar('i'), 0)
self._pvs[f"{p}POWER:CMD"] = self._make_command_pv(
NTScalar('i'), 0, "power_control")
self._pvs[f"{p}POWER:VOLTAGES"] = self._make_status_pv(
NTScalar('ad'), np.zeros(0, dtype=np.float64))
# --- Bellow ---
self._add_action("Bellow:Insert", "bellow_insert")
self._add_action("Bellow:Retract", "bellow_retract")
self._add_ro("Bellow:Position_RBV", NTScalar('i'), 0)
# Start the PVA server
# --- Detector power ---
power_rbv = self._add_ro("Power:Enable_RBV", NTScalar('?'), False)
self._add_rw("Power:Enable", NTScalar('?'), False,
"power_enable", power_rbv)
# Per-channel voltage/current (created dynamically from config)
for ch in self._server.config.detector_power.channels:
self._add_ro(f"Power:Ch{ch}:Voltage_RBV", NTScalar('d'), 0.0)
self._add_ro(f"Power:Ch{ch}:Current_RBV", NTScalar('d'), 0.0)
# --- TEC ---
tec_rbv = self._add_ro("TEC:Enable_RBV", NTScalar('?'), False)
self._add_rw("TEC:Enable", NTScalar('?'), False,
"tec_enable", tec_rbv)
for ch in self._server.config.tec.channels:
self._add_ro(f"TEC:Ch{ch}:Voltage_RBV", NTScalar('d'), 0.0)
self._add_ro(f"TEC:Ch{ch}:Current_RBV", NTScalar('d'), 0.0)
# Start PVA server
self._pva_server = Server(providers=[self._pvs])
logger.info("PVA server started with %d PVs (prefix=%s)",
len(self._pvs), self._prefix)
def post_status(self, name: str, value: Any) -> None:
"""
Update a status PV.
Args:
name: PV name suffix (without prefix), e.g., 'FRAME_RATE'.
value: New value to post.
"""
full_name = f"{self._prefix}{name}"
pv = self._pvs.get(full_name)
if pv:
def post(self, suffix: str, value: Any) -> None:
"""Update a PV value by suffix (e.g., 'FrameRate_RBV')."""
pv = self._pvs.get(self._pv_name(suffix))
if pv is not None:
try:
pv.post(value)
except Exception as e:
logger.debug("Could not post to %s: %s", full_name, e)
logger.debug("Could not post to %s: %s", suffix, e)
def post_enum(self, suffix: str, index: int, choices: list[str]) -> None:
"""Update an NTEnum PV."""
self.post(suffix, {"choices": choices, "index": index})
def post_state(self, state: DetectorState) -> None:
"""Update the DetectorState_RBV PV."""
self.post_enum("DetectorState_RBV", state.value, DETECTOR_STATE_CHOICES)
def post_exposure_mode(self, mode: int) -> None:
"""Update ExposureMode RBVs."""
val = {"choices": EXPOSURE_MODE_CHOICES, "index": mode}
self.post("ExposureMode_RBV", val)
self.post("ExposureMode", val)
def post_trigger_mode(self, mode: int) -> None:
"""Update TriggerMode RBVs."""
val = {"choices": TRIGGER_MODE_CHOICES, "index": mode}
self.post("TriggerMode_RBV", val)
self.post("TriggerMode", val)
def stop(self) -> None:
"""Stop the PVA server."""

View File

@@ -2,18 +2,19 @@
"""
Team1k Server — Controls the Team1k X-ray detector system.
Communicates directly with the detector via UDP (replacing k_test C++ program).
Uses a dedicated subprocess for high-throughput data acquisition, shared memory
ring buffer for data transfer, and EPICS PV Access for all client communication.
Communicates directly with the detector via UDP. Uses a dedicated subprocess
for high-throughput data acquisition, shared memory ring buffer for data
transfer, and EPICS PV Access for the image stream and basic controls.
Python clients connect via TCP for full control and frame capture.
Architecture:
Main process:
- Detector register read/write (UDP port 42000)
- PVA server (commands, status, data)
- PVA server (areaDetector-style PVs + IMAGE stream)
- PVA streamer thread (shmring -> NTNDArray)
- Frame capture thread (shmring -> temp file, on demand)
- ZMQ data transfer server (temp file -> client)
- Peripheral control (bellow stage, power supply)
- TCP client server (control + buffered frame capture)
- Power supply / TEC monitoring
- Auto-reconnect on detector errors
Acquisition subprocess:
- UDP data reception (port 41000)
@@ -29,269 +30,360 @@ import argparse
import threading
from typing import Any
from .config import Team1kConfig, load_config
from .state import DetectorState
from .detector.registers import RegisterInterface
from .detector.commands import DetectorCommands
from .detector.adc import ADCController
from .detector.chip_config import TEAM1K_CONFIG, configure_chip
from .detector.parameter_file import apply_parameter_file
from .acquisition.receiver import AcquisitionProcess
from .filewriter.capture import FrameCapture, DataTransferServer
from .pva.interface import PVAInterface
from .pva.streamer import PVAStreamer
from .tcp_server import TCPClientServer
from .peripherals.bellow_stage import BellowStage
from .peripherals.power_supply import PowerSupply
from .peripherals.power_supply import DetectorPowerSupply
from .peripherals.tec import TECController
logger = logging.getLogger(__name__)
# Default configuration
DEFAULT_DETECTOR_IP = "10.0.0.32"
DEFAULT_REGISTER_PORT = 42000
DEFAULT_DATA_PORT = 41000
DEFAULT_PV_PREFIX = "TEAM1K:"
DEFAULT_BELLOW_PORT = "/dev/CameraBellowStage"
DEFAULT_ZMQ_PORT = 42005
class Team1kServer:
"""
Main server coordinating all subsystems.
Manages direct UDP communication with the detector, data acquisition
via a dedicated subprocess, PVA streaming, and on-demand frame capture
with ZMQ data transfer to remote clients.
via a dedicated subprocess, PVA streaming, TCP client handling, and
peripheral control. Designed to be resilient to detector errors.
"""
def __init__(self, detector_ip: str = DEFAULT_DETECTOR_IP,
register_port: int = DEFAULT_REGISTER_PORT,
data_port: int = DEFAULT_DATA_PORT,
pv_prefix: str = DEFAULT_PV_PREFIX,
config_file: str | None = None,
bellow_port: str = DEFAULT_BELLOW_PORT,
zmq_port: int = DEFAULT_ZMQ_PORT):
self.detector_ip = detector_ip
self.register_port = register_port
self.data_port = data_port
def __init__(self, config: Team1kConfig):
self.config = config
self.chip_config = TEAM1K_CONFIG
self._config_file = config_file
self._shutdown_event = threading.Event()
self._acquiring = False
self._state_lock = threading.Lock()
self._state = DetectorState.DISCONNECTED
# Detector communication (main process, register port)
self.registers = RegisterInterface(detector_ip, register_port)
self.commands = DetectorCommands(self.registers)
self.adc = ADCController(self.registers)
# Cached parameter values (avoids register reads)
self._params = {
"exposure_mode": config.exposure_mode,
"trigger_mode": config.trigger_mode,
"trigger_polarity": config.trigger_polarity,
"integration_time": config.integration_time_ms,
"frame_rate": 0.0,
"frame_count": 0,
}
# Detector communication
self.registers: RegisterInterface | None = None
self.commands: DetectorCommands | None = None
self.adc: ADCController | None = None
# Acquisition subprocess
self.acquisition = AcquisitionProcess(
detector_ip, data_port,
config.detector_ip, config.data_port,
ring_name="team1k_frames",
num_ring_slots=32,
chip_config=self.chip_config,
)
# PVA interface
self.pva = PVAInterface(self, prefix=pv_prefix)
self.pva = PVAInterface(self, prefix=config.pv_prefix)
# Frame capture + ZMQ data transfer (replaces local file writing)
self.frame_capture = FrameCapture("team1k_frames", self.chip_config)
self.data_server = DataTransferServer(
self.frame_capture, self.chip_config, port=zmq_port,
)
# PVA streamer will be created after PVA setup
# PVA streamer (created after PVA setup)
self._pva_streamer: PVAStreamer | None = None
# TCP client server
self.tcp_server = TCPClientServer(self, port=config.client_port)
# Peripherals
self.bellow_stage = BellowStage(bellow_port)
self.power_supply = PowerSupply()
self.bellow_stage = BellowStage(config.bellow_stage)
self.detector_power = DetectorPowerSupply(config.detector_power)
self.tec = TECController(config.tec)
def initialize(self) -> None:
"""
Initialize the detector.
@property
def state(self) -> DetectorState:
with self._state_lock:
return self._state
Equivalent to the KTestFunctions constructor sequence:
1. Read firmware version
2. Configure chip type
3. Set default exposure mode, trigger, integration time
4. Set ADC clock frequency
5. Set ADC data delay
6. Optionally apply parameter file
"""
logger.info("Initializing detector at %s:%d", self.detector_ip, self.register_port)
@state.setter
def state(self, new_state: DetectorState) -> None:
with self._state_lock:
old = self._state
self._state = new_state
if old != new_state:
logger.info("State: %s -> %s", old.name, new_state.name)
try:
self.pva.post_state(new_state)
except Exception:
pass
# Read firmware version
def _connect_detector(self) -> bool:
"""Create register interface and command objects. Returns success."""
try:
version = self.commands.read_firmware_version()
logger.info("Firmware version: %d", version)
if self.registers is not None:
try:
self.registers.close()
except Exception:
pass
self.registers = RegisterInterface(
self.config.detector_ip, self.config.register_port,
)
self.commands = DetectorCommands(self.registers)
self.adc = ADCController(self.registers)
return True
except Exception as e:
logger.warning("Could not read firmware version: %s", e)
logger.error("Failed to connect to detector: %s", e)
return False
# Configure chip
configure_chip(self.registers, self.chip_config)
# Set defaults (matching KTestFunctions constructor)
self.commands.set_exposure_mode(3) # Global shutter CDS
self.commands.set_trigger_mode(False) # Internal trigger
self.commands.set_integration_time(6.0) # 6 ms
self.adc.set_clock_freq(60.0) # 60 MHz
self.commands.set_adc_data_delay(0x1A7)
self.commands.set_adc_data_averaging(0)
self.commands.enable_fpga_test_data(False)
# Apply parameter file if provided
if self._config_file:
ok, msg = apply_parameter_file(
self._config_file, self.commands, self.adc, self.registers,
restart_daq_callback=lambda: self._restart_daq()
)
if not ok:
logger.error("Parameter file error:\n%s", msg)
else:
logger.info("Parameter file applied:\n%s", msg)
logger.info("Detector initialized")
def execute_command(self, command: str, value: Any) -> Any:
def initialize(self) -> bool:
"""
Dispatch a command from a PVA put handler.
Initialize the detector with config values.
This is called from p4p's internal thread, so it must be thread-safe.
Register operations are protected by the RegisterInterface's lock.
Returns True on success, False on error (state set to ERROR).
"""
logger.info("Executing command: %s = %s", command, value)
self.state = DetectorState.INITIALIZING
if command == "set_exposure_mode":
self.commands.set_exposure_mode(int(value))
self.pva.post_status("EXPOSURE_MODE", int(value))
if not self._connect_detector():
self.state = DetectorState.ERROR
return False
elif command == "set_trigger_mode":
self.commands.set_trigger_mode(external=bool(int(value)))
self.pva.post_status("TRIGGER_MODE", int(value))
try:
# Firmware version
try:
version = self.commands.read_firmware_version()
logger.info("Firmware version: %d", version)
except Exception as e:
logger.warning("Could not read firmware version: %s", e)
elif command == "set_integration_time":
self.commands.set_integration_time(float(value))
self.pva.post_status("INTEGRATION_TIME", float(value))
# Chip configuration
configure_chip(self.registers, self.chip_config)
elif command == "start_stop_daq":
if int(value):
self._start_daq()
else:
self._stop_daq()
elif command == "capture_frames":
num = int(value)
if num <= 0:
raise ValueError(f"Invalid frame count: {num}")
self.frame_capture.capture(num)
self.pva.post_status("CAPTURE:STATUS", "CAPTURING")
self.pva.post_status("CAPTURE:TOTAL", num)
self.pva.post_status("CAPTURE:PROGRESS", 0)
elif command == "set_adc_clock_freq":
self.adc.set_clock_freq(float(value))
elif command == "set_adc_data_delay":
self.commands.set_adc_data_delay(int(value))
elif command == "apply_parameter_file":
ok, msg = apply_parameter_file(
str(value), self.commands, self.adc, self.registers,
restart_daq_callback=lambda: self._restart_daq()
# Apply config defaults
self.commands.set_exposure_mode(self.config.exposure_mode)
self.commands.set_trigger_mode(
external=self.config.trigger_external,
polarity=not self.config.trigger_polarity_rising,
)
if not ok:
raise RuntimeError(msg)
self.pva.post_status("STATUS", [msg])
self.commands.set_integration_time(self.config.integration_time_ms)
self.adc.set_clock_freq(self.config.adc_clock_frequency_mhz)
self.commands.set_adc_data_delay(self.config.adc_data_delay)
self.commands.set_adc_data_averaging(0)
self.commands.enable_fpga_test_data(False)
elif command == "reset_connection":
if int(value):
self._stop_daq()
self.registers.close()
self.registers = RegisterInterface(self.detector_ip, self.register_port)
self.commands = DetectorCommands(self.registers)
self.adc = ADCController(self.registers)
self.initialize()
# ADC order registers
self.registers.write_register(30, self.config.adc_order_7to0)
self.registers.write_register(31, self.config.adc_order_15to8)
elif command == "set_test_mode":
self.commands.enable_fpga_test_data(bool(int(value)))
# Digital signal registers
self.registers.write_register(27, self.config.digital_polarity)
self.registers.write_register(28, self.config.digital_order_7to0)
self.registers.write_register(29, self.config.digital_order_15to8)
elif command == "bellow_move":
insert = bool(int(value))
success, error = self.bellow_stage.move(insert)
# Update cached params
self._params["exposure_mode"] = self.config.exposure_mode
self._params["trigger_mode"] = (
1 if self.config.trigger_external else 0
)
self._params["integration_time"] = self.config.integration_time_ms
self.state = DetectorState.IDLE
logger.info("Detector initialized")
return True
except Exception as e:
logger.error("Initialization failed: %s", e)
self.state = DetectorState.ERROR
return False
def get_parameter(self, key: str) -> Any:
"""Get a cached parameter value."""
if key in self._params:
return self._params[key]
raise ValueError(f"Unknown parameter: {key}")
def apply_parameter(self, name: str, value: Any) -> None:
"""
Apply a parameter change. Used by both PVA and TCP interfaces.
Writes to the detector, updates the cache, and posts PVA RBVs.
"""
if self.commands is None:
raise RuntimeError("Detector not connected")
try:
if name == "exposure_mode":
mode = int(value)
self.commands.set_exposure_mode(mode)
self._params["exposure_mode"] = mode
self.pva.post_exposure_mode(mode)
elif name == "trigger_mode" or name == "trigger_polarity":
if name == "trigger_mode":
mode = int(value)
polarity = self.config.trigger_polarity
else:
mode = self.config.trigger_mode
polarity = int(value)
self.commands.set_trigger_mode(external=bool(mode), polarity=bool(polarity))
self._params["trigger_mode"] = mode
self._params["trigger_polarity"] = polarity
self.pva.post_trigger_mode(mode)
elif name in ("integration_time", "acquire_time"):
# PVA sends seconds (areaDetector convention), TCP sends ms
ms = float(value)
if name == "acquire_time":
ms = float(value) * 1000.0 # seconds to ms
self.commands.set_integration_time(ms)
self._params["integration_time"] = ms
self.pva.post("AcquireTime_RBV", ms / 1000.0)
elif name == "acquire":
if bool(value):
self._start_daq()
else:
self._stop_daq()
elif name == "power_enable":
if bool(value):
self.detector_power.power_on()
else:
self.detector_power.power_off()
self.pva.post("Power:Enable_RBV", bool(value))
elif name == "tec_enable":
if bool(value):
self.tec.power_on()
else:
self.tec.power_off()
self.pva.post("TEC:Enable_RBV", bool(value))
else:
raise ValueError(f"Unknown parameter: {name}")
except Exception as e:
# On register errors, transition to ERROR
if "register" in str(e).lower() or "udp" in str(e).lower():
self.state = DetectorState.ERROR
raise
def execute_action(self, action: str) -> None:
"""Execute a one-shot action (bellow, power, etc.)."""
if action == "bellow_insert":
success, error = self.bellow_stage.move(insert=True)
if not success:
raise RuntimeError(error)
self.pva.post_status("BELLOW:POSITION", self.bellow_stage.position)
self.pva.post("Bellow:Position_RBV", self.bellow_stage.position)
elif command == "power_control":
if int(value):
success, error = self.power_supply.turn_on()
else:
success, error = self.power_supply.turn_off()
elif action == "bellow_retract":
success, error = self.bellow_stage.move(insert=False)
if not success:
raise RuntimeError(error)
self.pva.post("Bellow:Position_RBV", self.bellow_stage.position)
elif action == "power_on":
self.detector_power.power_on()
self.pva.post("Power:Enable_RBV", True)
elif action == "power_off":
self.detector_power.power_off()
self.pva.post("Power:Enable_RBV", False)
elif action == "tec_on":
self.tec.power_on()
self.pva.post("TEC:Enable_RBV", True)
elif action == "tec_off":
self.tec.power_off()
self.pva.post("TEC:Enable_RBV", False)
else:
raise ValueError(f"Unknown command: {command}")
raise ValueError(f"Unknown action: {action}")
return True
def reconnect(self) -> None:
"""Reconnect to detector after power cycle or error."""
logger.info("Reconnecting to detector...")
self._stop_daq()
if self.initialize():
self._start_daq()
def _start_daq(self) -> None:
"""Start data acquisition."""
if self._acquiring:
if self.state == DetectorState.ACQUIRING:
return
# Start the acquisition subprocess if not running
if self.commands is None:
raise RuntimeError("Detector not connected")
if not self.acquisition.is_alive:
self.acquisition.start_process()
time.sleep(0.5) # Let subprocess initialize
time.sleep(0.5)
# Start data flow on the detector
self.commands.stop_data_flow()
time.sleep(0.01)
self.commands.start_data_flow()
# Tell subprocess to start receiving
self.acquisition.start_acquisition()
self._acquiring = True
self.pva.post_status("ACQUIRING", True)
self.pva.post_status("STATUS", ["ACQUIRING"])
self.state = DetectorState.ACQUIRING
self.pva.post("Acquire_RBV", True)
logger.info("DAQ started")
def _stop_daq(self) -> None:
"""Stop data acquisition."""
if not self._acquiring:
if self.state != DetectorState.ACQUIRING:
return
# Stop data flow on detector
self.commands.stop_data_flow()
try:
if self.commands:
self.commands.stop_data_flow()
self.acquisition.stop_acquisition()
except Exception as e:
logger.warning("Error stopping DAQ: %s", e)
# Stop acquisition subprocess
self.acquisition.stop_acquisition()
self._acquiring = False
self.pva.post_status("ACQUIRING", False)
self.pva.post_status("STATUS", ["IDLE"])
self.state = DetectorState.IDLE
self.pva.post("Acquire_RBV", False)
logger.info("DAQ stopped")
def _restart_daq(self) -> bool:
"""Restart DAQ (stop then start)."""
self._stop_daq()
time.sleep(0.1)
self._start_daq()
return True
def _auto_reconnect_loop(self) -> None:
"""Background thread: auto-reconnect when in ERROR state."""
interval = self.config.reconnect_interval
while not self._shutdown_event.is_set():
if self.state == DetectorState.ERROR:
logger.info("Auto-reconnect: attempting...")
try:
self.reconnect()
except Exception as e:
logger.warning("Auto-reconnect failed: %s", e)
self._shutdown_event.wait(interval)
def _status_update_loop(self) -> None:
"""Periodically update status PVs."""
while not self._shutdown_event.is_set():
try:
if self._pva_streamer:
self.pva.post_status("FRAME_RATE", self._pva_streamer.frame_rate)
self.pva.post_status("FRAME_COUNT", self._pva_streamer.frame_count)
rate = self._pva_streamer.frame_rate
count = self._pva_streamer.frame_count
self._params["frame_rate"] = rate
self._params["frame_count"] = count
self.pva.post("FrameRate_RBV", rate)
self.pva.post("ArrayCounter_RBV", count)
# Power supply status
for ch, readings in self.detector_power.get_status().items():
self.pva.post(f"Power:Ch{ch}:Voltage_RBV",
readings.voltage)
self.pva.post(f"Power:Ch{ch}:Current_RBV",
readings.current)
for ch, readings in self.tec.get_status().items():
self.pva.post(f"TEC:Ch{ch}:Voltage_RBV",
readings.voltage)
self.pva.post(f"TEC:Ch{ch}:Current_RBV",
readings.current)
self.pva.post_status("CAPTURE:STATUS", self.frame_capture.status)
self.pva.post_status("CAPTURE:PROGRESS", self.frame_capture.num_captured)
except Exception as e:
logger.debug("Status update error: %s", e)
@@ -302,22 +394,28 @@ class Team1kServer:
Run the server.
1. Initialize detector
2. Start PVA server
3. Start acquisition subprocess
4. Start file writer and PVA streamer threads
2. Start PVA server + streamer
3. Start acquisition (DAQ runs continuously)
4. Start TCP client server
5. Block until shutdown signal
"""
# Initialize
# Initialize detector
self.initialize()
# Set up PVA server
# PVA server
self.pva.setup()
# Start acquisition subprocess
# Update initial PVA values
self.pva.post_exposure_mode(self._params["exposure_mode"])
self.pva.post_trigger_mode(self._params["trigger_mode"])
self.pva.post("AcquireTime_RBV",
self._params["integration_time"] / 1000.0)
# Acquisition subprocess
self.acquisition.start_process()
time.sleep(0.5)
# Start PVA streamer thread
# PVA streamer
self._pva_streamer = PVAStreamer(
self.acquisition.ring_name,
self.pva.image_pv,
@@ -325,22 +423,32 @@ class Team1kServer:
)
self._pva_streamer.start()
# Start frame capture + ZMQ data transfer threads
self.frame_capture.start()
self.data_server.start()
# TCP client server
self.tcp_server.start()
# Start status update thread
status_thread = threading.Thread(target=self._status_update_loop,
daemon=True, name="team1k-status")
status_thread.start()
# Status update thread
threading.Thread(
target=self._status_update_loop,
daemon=True, name="team1k-status",
).start()
# Update initial status
self.pva.post_status("STATUS", ["READY"])
self.pva.post_status("EXPOSURE_MODE", self.commands.exposure_mode)
# Auto-reconnect thread (if enabled)
if self.config.auto_reconnect:
threading.Thread(
target=self._auto_reconnect_loop,
daemon=True, name="team1k-reconnect",
).start()
# Start DAQ automatically (continuous mode)
if self.state == DetectorState.IDLE:
try:
self._start_daq()
except Exception as e:
logger.error("Failed to start DAQ: %s", e)
logger.info("Server running. Press Ctrl+C to stop.")
# Set up signal handlers
# Signal handlers
def _handle_signal(signum, _frame):
logger.info("Received signal %d, shutting down...", signum)
self._shutdown_event.set()
@@ -350,8 +458,6 @@ class Team1kServer:
# Block until shutdown
self._shutdown_event.wait()
# Shutdown sequence
self.shutdown()
def shutdown(self) -> None:
@@ -359,26 +465,17 @@ class Team1kServer:
logger.info("Shutting down server...")
self._shutdown_event.set()
# Stop DAQ
if self._acquiring:
self._stop_daq()
self._stop_daq()
# Stop PVA streamer
if self._pva_streamer:
self._pva_streamer.stop()
# Stop frame capture + data transfer
self.frame_capture.stop()
self.data_server.stop()
# Shutdown acquisition subprocess
self.tcp_server.stop()
self.acquisition.shutdown()
# Stop PVA server
self.pva.stop()
# Close register interface
self.registers.close()
if self.registers:
self.registers.close()
logger.info("Server shutdown complete")
@@ -386,39 +483,36 @@ class Team1kServer:
def main():
"""Main entry point for the server."""
parser = argparse.ArgumentParser(description="Team1k Detector Server")
parser.add_argument('--detector-ip', default=DEFAULT_DETECTOR_IP,
help=f"Detector IP address (default: {DEFAULT_DETECTOR_IP})")
parser.add_argument('--register-port', type=int, default=DEFAULT_REGISTER_PORT,
help=f"Detector register port (default: {DEFAULT_REGISTER_PORT})")
parser.add_argument('--data-port', type=int, default=DEFAULT_DATA_PORT,
help=f"Detector data port (default: {DEFAULT_DATA_PORT})")
parser.add_argument('--pv-prefix', default=DEFAULT_PV_PREFIX,
help=f"PVA prefix (default: {DEFAULT_PV_PREFIX})")
parser.add_argument('--config', default=None,
help="Parameter file to apply on startup")
parser.add_argument('--bellow-port', default=DEFAULT_BELLOW_PORT,
help=f"Bellow stage serial port (default: {DEFAULT_BELLOW_PORT})")
parser.add_argument('--zmq-port', type=int, default=DEFAULT_ZMQ_PORT,
help=f"ZMQ data transfer port (default: {DEFAULT_ZMQ_PORT})")
parser.add_argument('--log-level', default='INFO',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
parser.add_argument("--config", default=None,
help="YAML config file path")
parser.add_argument("--detector-ip", default=None,
help="Override detector IP address")
parser.add_argument("--pv-prefix", default=None,
help="Override PVA prefix")
parser.add_argument("--client-port", type=int, default=None,
help="Override TCP client port")
parser.add_argument("--log-level", default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
help="Logging level (default: INFO)")
args = parser.parse_args()
logging.basicConfig(
level=getattr(logging, args.log_level),
format='%(asctime)s %(name)-30s %(levelname)-8s %(message)s',
format="%(asctime)s %(name)-30s %(levelname)-8s %(message)s",
)
server = Team1kServer(
detector_ip=args.detector_ip,
register_port=args.register_port,
data_port=args.data_port,
pv_prefix=args.pv_prefix,
config_file=args.config,
bellow_port=args.bellow_port,
zmq_port=args.zmq_port,
)
# Load config
config = load_config(args.config)
# Apply CLI overrides
if args.detector_ip:
config.detector_ip = args.detector_ip
if args.pv_prefix:
config.pv_prefix = args.pv_prefix
if args.client_port:
config.client_port = args.client_port
server = Team1kServer(config)
try:
server.run()

30
src/team1k/state.py Normal file
View File

@@ -0,0 +1,30 @@
"""
Detector state enum.
Tracks the current state of the detector connection and acquisition.
Used by the server, PVA interface, and TCP client handler.
"""
import enum
class DetectorState(enum.IntEnum):
"""Detector connection and acquisition state."""
DISCONNECTED = 0 # Detector powered off or unreachable
INITIALIZING = 1 # Configuring registers
IDLE = 2 # Ready, DAQ not running
ACQUIRING = 3 # DAQ running, streaming frames
ERROR = 4 # Register error, needs reconnect
@property
def label(self) -> str:
"""Human-readable label for PVA NTEnum."""
return self.name.capitalize()
# Labels for NTEnum PV choices (index matches IntEnum value)
DETECTOR_STATE_CHOICES = [s.label for s in DetectorState]
EXPOSURE_MODE_CHOICES = ["Rolling", "Global", "GlobalFlush", "GlobalCDS"]
TRIGGER_MODE_CHOICES = ["Internal", "External"]

View File

@@ -0,0 +1,71 @@
"""
TCP message framing protocol.
Shared by tcp_server.py (server side) and client.py (client side).
No external dependencies — stdlib only.
Message types:
- JSON messages: 1-byte type + 4-byte big-endian length prefix + UTF-8 JSON
- Binary data: 1-byte type + 8-byte big-endian length prefix + raw bytes
"""
import json
import struct
import socket
from typing import Union
# Length prefix formats
_JSON_HEADER = struct.Struct("!I") # 4-byte unsigned int (max ~4 GB)
_DATA_HEADER = struct.Struct("!Q") # 8-byte unsigned long (supports >4 GB)
def send_msg(sock: socket.socket, obj: dict) -> None:
"""Send a JSON-serializable dict with 4-byte length prefix."""
data = json.dumps(obj, separators=(",", ":")).encode("utf-8")
sock.sendall(b"J" + _JSON_HEADER.pack(len(data)) + data)
def recv_msg(sock: socket.socket) -> dict:
"""Receive a length-prefixed JSON message. Returns parsed dict."""
data = recv_bytes_or_msg(sock)
if isinstance(data, bytes):
raise ValueError("Expected JSON message but got binary data")
return data
def send_bytes(sock: socket.socket, data: bytes | memoryview) -> None:
"""Send raw bytes with 8-byte length prefix."""
sock.sendall(b"D" + _DATA_HEADER.pack(len(data)) + data)
def recv_bytes_or_msg(sock: socket.socket) -> Union[bytes | dict]:
"""Receive length-prefixed binary data (reads the 8-byte header first)."""
type_byte = _recv_exact(sock, 1)
if type_byte == b"J":
# JSON message
raw_len = _recv_exact(sock, _JSON_HEADER.size)
(length,) = _JSON_HEADER.unpack(raw_len)
data = _recv_exact(sock, length)
return json.loads(data)
elif type_byte == b"D":
# Binary data
raw_len = _recv_exact(sock, _DATA_HEADER.size)
(length,) = _DATA_HEADER.unpack(raw_len)
return _recv_exact(sock, length)
else:
raise ValueError(f"Unexpected message type: {type_byte}")
def recv_exact(sock: socket.socket, n: int) -> bytes:
"""Receive exactly n bytes from a socket. Public API."""
return _recv_exact(sock, n)
def _recv_exact(sock: socket.socket, n: int) -> bytes:
"""Receive exactly n bytes, raising ConnectionError on premature close."""
buf = bytearray(n)
view = memoryview(buf)
pos = 0
while pos < n:
nbytes = sock.recv_into(view[pos:])
if nbytes == 0:
raise ConnectionError(
f"Connection closed (received {pos}/{n} bytes)"
)
pos += nbytes
return bytes(buf)

253
src/team1k/tcp_server.py Normal file
View File

@@ -0,0 +1,253 @@
"""
TCP server for Python client connections.
Handles control commands (get/set parameters, register access)
and frame capture requests. Each client gets its own thread.
Only one capture can run at a time (capture lock).
"""
import socket
import logging
import threading
from typing import TYPE_CHECKING, Any
from .tcp_protocol import send_msg, recv_msg
from .capture import BufferedCapture
from .state import DetectorState
if TYPE_CHECKING:
from .server import Team1kServer
logger = logging.getLogger(__name__)
class TCPClientServer(threading.Thread):
"""
TCP server accepting client connections for control and data.
Each client connection is handled in a separate daemon thread.
Control commands are dispatched to server.apply_parameter() or
server.execute_action(). Captures use BufferedCapture.
"""
def __init__(self, server: 'Team1kServer', port: int = 42010):
super().__init__(daemon=True, name="team1k-tcp-server")
self._server = server
self._port = port
self._capture_lock = threading.Lock()
self._shutdown = threading.Event()
self._sock: socket.socket | None = None
def run(self) -> None:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._sock.bind(("0.0.0.0", self._port))
self._sock.listen(5)
self._sock.settimeout(0.5)
logger.info("TCP client server listening on port %d", self._port)
while not self._shutdown.is_set():
try:
client_sock, addr = self._sock.accept()
except socket.timeout:
continue
logger.info("Client connected from %s", addr)
t = threading.Thread(
target=self._handle_client,
args=(client_sock, addr),
daemon=True,
name=f"team1k-client-{addr[0]}:{addr[1]}",
)
t.start()
self._sock.close()
def _handle_client(self, sock: socket.socket, addr: tuple) -> None:
"""Handle one client connection."""
sock.settimeout(300) # 5 min idle timeout
try:
while not self._shutdown.is_set():
try:
msg = recv_msg(sock)
except (ConnectionError, socket.timeout):
break
try:
self._dispatch(sock, msg)
except Exception as e:
logger.error("Command error from %s: %s", addr, e)
try:
send_msg(sock, {"ok": False, "error": str(e)})
except ConnectionError:
break
finally:
sock.close()
logger.info("Client disconnected: %s", addr)
def _dispatch(self, sock: socket.socket, msg: dict) -> None:
"""Dispatch a client command."""
cmd = msg.get("cmd", "")
if cmd == "get":
key = msg["key"]
value = self._server.get_parameter(key)
send_msg(sock, {"ok": True, "value": value})
elif cmd == "set":
key = msg["key"]
value = msg["value"]
self._server.apply_parameter(key, value)
send_msg(sock, {"ok": True})
elif cmd == "status":
send_msg(sock, {
"ok": True,
"state": self._server.state.name,
"exposure_mode": self._server.get_parameter("exposure_mode"),
"trigger_mode": self._server.get_parameter("trigger_mode"),
"integration_time": self._server.get_parameter("integration_time"),
"frame_rate": self._server.get_parameter("frame_rate"),
"frame_count": self._server.get_parameter("frame_count"),
"acquiring": self._server.state == DetectorState.ACQUIRING,
"power_on": self._server.detector_power.is_on,
"tec_on": self._server.tec.is_on,
"bellow_inserted": abs(self._server.bellow_stage.position - self._server.config.bellow_stage.inserted_position_um) < 100,
})
elif cmd == "capture":
if not self._capture_lock.acquire(blocking=False):
send_msg(sock, {
"ok": False,
"error": "Another capture is in progress",
})
return
try:
sock.settimeout(None) # no timeout during capture
capture = BufferedCapture(
self._server.acquisition.ring_name,
self._server.chip_config,
)
if "num_frames" in msg:
capture.capture_to_socket(sock, int(msg["num_frames"]))
elif "duration_sec" in msg:
capture.capture_duration_to_socket(
sock, float(msg["duration_sec"]),
)
else:
send_msg(sock, {
"ok": False,
"error": "Specify num_frames or duration_sec",
})
finally:
sock.settimeout(300)
self._capture_lock.release()
elif cmd == "start_capture":
if not self._capture_lock.acquire(blocking=False):
send_msg(sock, {
"ok": False,
"error": "Another capture is in progress",
})
return
try:
sock.settimeout(None)
capture = BufferedCapture(
self._server.acquisition.ring_name,
self._server.chip_config,
)
handle = capture.start_streaming_to_socket(sock)
# Block until the streaming is stopped
# (client sends stop_capture, which we handle below)
# Actually, streaming runs in its own thread and sends
# data until handle.stop_event is set. We store the handle
# and wait for a stop_capture command.
self._active_stream = handle
# Don't send response here — the streaming thread already
# sent the header. We just wait for stop_capture.
# The client connection loop is paused while streaming.
handle.stop_event.wait()
finally:
sock.settimeout(300)
self._capture_lock.release()
self._active_stream = None
elif cmd == "stop_capture":
handle = getattr(self, "_active_stream", None)
if handle:
handle.stop_event.set()
send_msg(sock, {"ok": True})
else:
send_msg(sock, {"ok": False, "error": "No active capture"})
elif cmd == "register_read":
addr = int(msg["address"])
value = self._server.registers.read_register(addr)
send_msg(sock, {"ok": True, "value": value})
elif cmd == "register_write":
addr = int(msg["address"])
value = int(msg["value"])
self._server.registers.write_register(addr, value)
send_msg(sock, {"ok": True})
elif cmd == "reconnect":
self._server.reconnect()
send_msg(sock, {"ok": True})
elif cmd == "insert_detector":
self._server.execute_action("bellow_insert")
send_msg(sock, {"ok": True})
elif cmd == "retract_detector":
self._server.execute_action("bellow_retract")
send_msg(sock, {"ok": True})
elif cmd == "power_on":
self._server.execute_action("power_on")
send_msg(sock, {"ok": True})
elif cmd == "power_off":
self._server.execute_action("power_off")
send_msg(sock, {"ok": True})
elif cmd == "tec_on":
self._server.execute_action("tec_on")
send_msg(sock, {"ok": True})
elif cmd == "tec_off":
self._server.execute_action("tec_off")
send_msg(sock, {"ok": True})
elif cmd == "power_status":
status = self._server.detector_power.get_status()
# Convert int keys to strings for JSON
send_msg(sock, {
"ok": True,
"channels": {str(k): dict(v) for k, v in status.items()},
})
elif cmd == "tec_status":
status = self._server.tec.get_status()
send_msg(sock, {
"ok": True,
"channels": {str(k): dict(v) for k, v in status.items()},
})
elif cmd == "bellow_status":
position = self._server.bellow_stage.position
is_moving = self._server.bellow_stage.is_moving
send_msg(sock, {
"ok": True,
"position_um": position,
"is_moving": is_moving,
"bellow_inserted": abs(position - self._server.config.bellow_stage.inserted_position_um) < 100,
})
else:
send_msg(sock, {"ok": False, "error": f"Unknown command: {cmd}"})
def stop(self) -> None:
"""Signal shutdown."""
self._shutdown.set()