Claude v2

This commit is contained in:
2026-02-27 11:12:33 -06:00
parent 08135c6c13
commit 5a21f38dab
8 changed files with 690 additions and 436 deletions

206
README.md
View File

@@ -1 +1,205 @@
# `team1k` # team1k
Python control library and server for the TEAM1k X-ray detector.
Communicates directly with the detector FPGA via UDP, replacing the previous `k_test` C++ program. Uses EPICS PV Access (PVA) for command/status and ZMQ for bulk data transfer.
## Architecture
```
Client machine Detector machine
+--------------+ +-------------------------------+
| | PVA (commands/status) | team1k-server |
| Client |<---------------------->| Register I/O (UDP:42000) |
| | | PVA server |
| | ZMQ (frame data) | PVA streamer thread |
| |<---------------------->| Frame capture thread |
| | | ZMQ data transfer thread |
+--------------+ | |
| Acquisition subprocess |
| UDP data recv (UDP:41000) |
| Frame assembly |
| -> locking_shmring |
+-------------------------------+
```
## Installation
```bash
pip install .
```
For HDF5 support (optional):
```bash
pip install ".[hdf5]"
```
### Dependencies
- `locking_shmring` — shared memory ring buffer (must be installed separately)
- `p4p` — EPICS PV Access for Python
- `pyzmq` — ZeroMQ for bulk data transfer
- `numpy`, `pyserial`
## Running the server
```bash
team1k-server --detector-ip 10.0.0.32 --log-level INFO
```
All options:
```
--detector-ip Detector IP address (default: 10.0.0.32)
--register-port Detector register port (default: 42000)
--data-port Detector data port (default: 41000)
--pv-prefix PVA prefix (default: TEAM1K:)
--zmq-port ZMQ data transfer port (default: 42005)
--config Parameter file to apply on startup
--bellow-port Bellow stage serial port (default: /dev/CameraBellowStage)
--log-level DEBUG, INFO, WARNING, ERROR (default: INFO)
```
## Client usage
### Acquire frames
```python
from team1k import Client, ExposureModes
client = Client(data_host="detector-machine")
# Configure
client.set_exposure_mode(ExposureModes.GLOBAL_SHUTTER_CDS)
client.set_integration_time(6.0) # ms
# One-shot: start DAQ, capture 100 frames, stop DAQ
frames = client.acquire_frames(100)
print(frames.shape) # (100, 1024, 1024)
print(frames.dtype) # uint16
client.close()
```
### Manual DAQ control
```python
client = Client(data_host="detector-machine")
client.start_daq()
# Grab frames from the running stream
batch1 = client.get_frames(50)
batch2 = client.get_frames(50)
client.stop_daq()
client.close()
```
### Live image monitoring
```python
client = Client()
def on_frame(image):
print(f"Frame received: {image.shape}")
client.monitor_image(on_frame)
# ... later
client.stop_monitor("IMAGE")
client.close()
```
### Status
```python
client = Client()
print(client.get_status())
print(client.is_acquiring())
print(client.get_frame_rate())
print(client.get_exposure_mode())
print(client.get_integration_time())
client.close()
```
### Other commands
```python
client.set_trigger_mode(TriggerModes.EXTERNAL)
client.set_adc_clock_freq(60.0) # MHz
client.set_adc_data_delay(0x1A7)
client.load_parameter_file("/path/to/params.txt")
client.set_test_mode(True) # FPGA test pattern
client.reset_connection()
# Peripherals
client.insert_detector()
client.retract_detector()
client.power_on()
client.power_off()
```
## PV Access interface
All PVs are prefixed with `TEAM1K:` by default.
### Status PVs (read-only)
| PV | Type | Description |
|----|------|-------------|
| `STATUS` | string[] | Server status |
| `ACQUIRING` | bool | DAQ running |
| `FRAME_RATE` | float | Current frame rate (Hz) |
| `FRAME_COUNT` | int | Total frames acquired |
| `EXPOSURE_MODE` | int | Current exposure mode (0-3) |
| `TRIGGER_MODE` | int | Current trigger mode |
| `INTEGRATION_TIME` | float | Integration time (ms) |
| `CAPTURE:STATUS` | string | IDLE / CAPTURING / READY / ERROR |
| `CAPTURE:PROGRESS` | int | Frames captured so far |
| `CAPTURE:TOTAL` | int | Frames requested |
| `IMAGE` | NTNDArray | Live image stream |
### Command PVs (writable)
| PV | Type | Description |
|----|------|-------------|
| `CMD:EXPOSURE_MODE` | int | Set exposure mode |
| `CMD:TRIGGER_MODE` | int | Set trigger mode |
| `CMD:INTEGRATION_TIME` | float | Set integration time (ms) |
| `CMD:START_DAQ` | int | 1=start, 0=stop |
| `CMD:CAPTURE` | int | Capture N frames |
| `CMD:ADC_CLOCK_FREQ` | float | Set ADC clock (MHz) |
| `CMD:ADC_DATA_DELAY` | int | Set ADC data delay |
| `CMD:PARAMETER_FILE` | string | Apply parameter file |
| `CMD:RESET` | int | 1=reset connection |
| `CMD:TEST_MODE` | int | 1=enable FPGA test data |
## Package structure
```
src/team1k/
detector/ # Direct UDP communication with detector FPGA
udp_transport.py # Low-level UDP socket
registers.py # Register read/write protocol
data_port.py # Data port + loopback registration
chip_config.py # Chip constants (1024x1024, packet layout)
commands.py # Detector commands (exposure, trigger, etc.)
adc.py # Si570 I2C clock programming
parameter_file.py # Parameter file parser
acquisition/ # High-throughput data path
receiver.py # Acquisition subprocess (UDP -> shmring)
filewriter/ # On-demand frame capture
capture.py # FrameCapture + ZMQ DataTransferServer
pva/ # EPICS PV Access
interface.py # PVA server, command/status PVs
streamer.py # shmring -> NTNDArray live stream
peripherals/ # Hardware peripherals
bellow_stage.py # Camera bellow stage (serial)
power_supply.py # Power supply control
server.py # Main server entry point
Client.py # PVA + ZMQ client library
```

View File

@@ -15,6 +15,7 @@ dependencies = [
"numpy >= 1.24.0", "numpy >= 1.24.0",
"pyserial >= 3.5", "pyserial >= 3.5",
"p4p >= 4.1.0", "p4p >= 4.1.0",
"pyzmq >= 25.0.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@@ -1,23 +1,44 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Team1k Client — PVA-based interface to the Team1k detector server. Team1k Client — PVA + ZMQ interface to the Team1k detector server.
Replaces the previous ZMQ-based client with EPICS PV Access communication. Commands and status use EPICS PV Access (p4p).
All commands are sent via pvput to command PVs, and status/data are Bulk frame retrieval uses ZMQ for efficient transfer over the network.
received via pvget or monitor subscriptions.
Typical usage::
client = Client()
client.set_exposure_mode(ExposureModes.GLOBAL_SHUTTER_CDS)
client.set_integration_time(6.0)
# One-shot: start DAQ, grab 100 frames, stop DAQ
frames = client.acquire_frames(100)
print(frames.shape) # (100, 1024, 1024)
# Or manual control
client.start_daq()
frames = client.get_frames(50)
client.stop_daq()
client.close()
""" """
import enum import enum
import json
import time
import logging import logging
import threading
from typing import Any, Callable from typing import Any, Callable
import numpy as np import numpy as np
import zmq
from p4p.client.thread import Context, Subscription from p4p.client.thread import Context, Subscription
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DEFAULT_PV_PREFIX = "TEAM1K:"
DEFAULT_ZMQ_PORT = 42005
class ExposureModes(enum.IntEnum): class ExposureModes(enum.IntEnum):
"""Detector exposure modes.""" """Detector exposure modes."""
@@ -34,28 +55,33 @@ class TriggerModes(enum.IntEnum):
class CommandError(Exception): class CommandError(Exception):
"""Raised when a PVA command put fails.""" """Raised when a PVA command put or ZMQ transfer fails."""
pass pass
class Client: class Client:
""" """
PVA client for the Team1k detector server. Client for the Team1k detector server.
All communication uses EPICS PV Access: Uses PVA for commands/status and ZMQ for bulk frame data transfer.
- Commands: pvput to {prefix}CMD:* PVs
- Status: pvget from {prefix}* status PVs
- Data: monitor on {prefix}IMAGE PV
Args: Args:
prefix: PV name prefix (default: "TEAM1K:"). prefix: PV name prefix (default: "TEAM1K:").
data_host: Hostname/IP of the server's ZMQ data port.
data_port: ZMQ port for frame data transfer.
timeout: Default timeout in seconds for PVA operations. timeout: Default timeout in seconds for PVA operations.
""" """
def __init__(self, prefix: str = "TEAM1K:", timeout: float = 5.0): def __init__(self, prefix: str = DEFAULT_PV_PREFIX,
data_host: str = "localhost",
data_port: int = DEFAULT_ZMQ_PORT,
timeout: float = 5.0):
self._prefix = prefix self._prefix = prefix
self._data_host = data_host
self._data_port = data_port
self._timeout = timeout self._timeout = timeout
self._ctx = Context("pva") self._ctx = Context("pva")
self._zmq_ctx = zmq.Context()
self._subscriptions: dict[str, Subscription] = {} self._subscriptions: dict[str, Subscription] = {}
def _pv(self, name: str) -> str: def _pv(self, name: str) -> str:
@@ -74,10 +100,115 @@ class Client:
"""Get the current value of a PV.""" """Get the current value of a PV."""
pv_name = self._pv(pv_suffix) pv_name = self._pv(pv_suffix)
try: try:
return self._ctx.get(pv_name, timeout=self._timeout) result = self._ctx.get(pv_name, timeout=self._timeout)
return result.value if hasattr(result, 'value') else result
except Exception as e: except Exception as e:
raise CommandError(f"Failed to get {pv_name}: {e}") from e raise CommandError(f"Failed to get {pv_name}: {e}") from e
# --- Frame retrieval (the primary data interface) ---
def get_frames(self, num_frames: int, timeout: float = 120.0) -> np.ndarray:
"""
Capture frames from the running DAQ and transfer them.
DAQ must already be running. This triggers a server-side capture of
N consecutive frames, waits for completion, then fetches the data
over ZMQ.
Args:
num_frames: Number of frames to capture.
timeout: Maximum time to wait for capture + transfer (seconds).
Returns:
numpy array of shape (num_frames, ny, nx) with dtype uint16.
Raises:
CommandError: If capture fails, times out, or transfer errors.
"""
# Trigger server-side capture
self._put("CMD:CAPTURE", num_frames)
# Poll capture status until READY
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
status = self._get("CAPTURE:STATUS")
if status == "READY":
break
if status == "ERROR":
raise CommandError("Server-side capture failed")
time.sleep(0.1)
else:
raise CommandError(
f"Capture timed out after {timeout}s "
f"(status={self._get('CAPTURE:STATUS')})"
)
# Fetch data over ZMQ
remaining = deadline - time.monotonic()
return self._fetch_zmq(timeout=max(remaining, 5.0))
def acquire_frames(self, num_frames: int, timeout: float = 120.0) -> np.ndarray:
"""
Start DAQ, capture N frames, stop DAQ, and return the data.
Convenience method for one-shot acquisitions. Equivalent to::
client.start_daq()
frames = client.get_frames(num_frames)
client.stop_daq()
Args:
num_frames: Number of frames to capture.
timeout: Maximum time to wait (seconds).
Returns:
numpy array of shape (num_frames, ny, nx) with dtype uint16.
"""
self.start_daq()
try:
return self.get_frames(num_frames, timeout=timeout)
finally:
self.stop_daq()
def _fetch_zmq(self, timeout: float = 60.0) -> np.ndarray:
"""Fetch captured frame data from the server via ZMQ."""
sock = self._zmq_ctx.socket(zmq.REQ)
sock.setsockopt(zmq.RCVTIMEO, int(timeout * 1000))
sock.setsockopt(zmq.LINGER, 0)
sock.connect(f"tcp://{self._data_host}:{self._data_port}")
try:
sock.send(b"FETCH")
parts = sock.recv_multipart()
if len(parts) != 2:
raise CommandError(f"Unexpected ZMQ response: {len(parts)} parts")
header = json.loads(parts[0])
if "error" in header:
raise CommandError(f"Server error: {header['error']}")
data = parts[1]
n = header["num_frames"]
ny = header["ny"]
nx = header["nx"]
dtype = np.dtype(header.get("dtype", "uint16"))
expected_bytes = n * ny * nx * dtype.itemsize
if len(data) != expected_bytes:
raise CommandError(
f"Data size mismatch: got {len(data)}, "
f"expected {expected_bytes}"
)
arr = np.frombuffer(data, dtype=dtype).copy()
return arr.reshape(n, ny, nx)
except zmq.Again:
raise CommandError(f"ZMQ fetch timed out after {timeout}s")
finally:
sock.close()
# --- Detector commands --- # --- Detector commands ---
def set_exposure_mode(self, mode: ExposureModes | int) -> None: def set_exposure_mode(self, mode: ExposureModes | int) -> None:
@@ -100,18 +231,6 @@ class Client:
"""Stop data acquisition.""" """Stop data acquisition."""
self._put("CMD:START_DAQ", 0) self._put("CMD:START_DAQ", 0)
def set_file_writing(self, enable: bool) -> None:
"""Enable or disable file writing."""
self._put("CMD:FILE_WRITING", 1 if enable else 0)
def set_output_dir(self, path: str) -> None:
"""Set the output directory for file writing."""
self._put("CMD:OUTPUT_DIR", path)
def set_file_format(self, fmt: str) -> None:
"""Set file format: 'raw' or 'hdf5'."""
self._put("CMD:FILE_FORMAT", fmt)
def set_adc_clock_freq(self, freq_mhz: float) -> None: def set_adc_clock_freq(self, freq_mhz: float) -> None:
"""Set ADC clock frequency in MHz (50-100).""" """Set ADC clock frequency in MHz (50-100)."""
self._put("CMD:ADC_CLOCK_FREQ", freq_mhz) self._put("CMD:ADC_CLOCK_FREQ", freq_mhz)
@@ -155,74 +274,61 @@ class Client:
def get_status(self) -> list[str]: def get_status(self) -> list[str]:
"""Get server status strings.""" """Get server status strings."""
val = self._get("STATUS") val = self._get("STATUS")
return list(val.value) if hasattr(val, 'value') else list(val) return list(val) if not isinstance(val, str) else [val]
def is_acquiring(self) -> bool: def is_acquiring(self) -> bool:
"""Check if DAQ is running.""" """Check if DAQ is running."""
val = self._get("ACQUIRING") return bool(self._get("ACQUIRING"))
return bool(val.value if hasattr(val, 'value') else val)
def get_frame_rate(self) -> float: def get_frame_rate(self) -> float:
"""Get current frame rate in Hz.""" """Get current frame rate in Hz."""
val = self._get("FRAME_RATE") return float(self._get("FRAME_RATE"))
return float(val.value if hasattr(val, 'value') else val)
def get_frame_count(self) -> int: def get_frame_count(self) -> int:
"""Get total frames acquired.""" """Get total frames acquired."""
val = self._get("FRAME_COUNT") return int(self._get("FRAME_COUNT"))
return int(val.value if hasattr(val, 'value') else val)
def get_exposure_mode(self) -> int: def get_exposure_mode(self) -> int:
"""Get current exposure mode.""" """Get current exposure mode."""
val = self._get("EXPOSURE_MODE") return int(self._get("EXPOSURE_MODE"))
return int(val.value if hasattr(val, 'value') else val)
def get_trigger_mode(self) -> int: def get_trigger_mode(self) -> int:
"""Get current trigger mode.""" """Get current trigger mode."""
val = self._get("TRIGGER_MODE") return int(self._get("TRIGGER_MODE"))
return int(val.value if hasattr(val, 'value') else val)
def get_integration_time(self) -> float: def get_integration_time(self) -> float:
"""Get current integration time in ms.""" """Get current integration time in ms."""
val = self._get("INTEGRATION_TIME") return float(self._get("INTEGRATION_TIME"))
return float(val.value if hasattr(val, 'value') else val)
def is_file_writing(self) -> bool: def get_capture_status(self) -> str:
"""Check if file writing is enabled.""" """Get capture status (IDLE, CAPTURING, READY, ERROR)."""
val = self._get("FILE_WRITING") return str(self._get("CAPTURE:STATUS"))
return bool(val.value if hasattr(val, 'value') else val)
def get_output_dir(self) -> str: def get_capture_progress(self) -> tuple[int, int]:
"""Get current output directory.""" """Get capture progress as (captured, total)."""
val = self._get("OUTPUT_DIR") progress = int(self._get("CAPTURE:PROGRESS"))
return str(val.value if hasattr(val, 'value') else val) total = int(self._get("CAPTURE:TOTAL"))
return progress, total
def get_scan_number(self) -> int:
"""Get current scan number."""
val = self._get("SCAN_NUMBER")
return int(val.value if hasattr(val, 'value') else val)
def get_bellow_position(self) -> int: def get_bellow_position(self) -> int:
"""Get bellow stage position.""" """Get bellow stage position."""
val = self._get("BELLOW:POSITION") return int(self._get("BELLOW:POSITION"))
return int(val.value if hasattr(val, 'value') else val)
def get_power_voltages(self) -> np.ndarray: def get_power_voltages(self) -> np.ndarray:
"""Get power supply voltage readings.""" """Get power supply voltage readings."""
val = self._get("POWER:VOLTAGES") return np.asarray(self._get("POWER:VOLTAGES"))
return np.asarray(val.value if hasattr(val, 'value') else val)
# --- Monitors (live subscriptions) --- # --- Monitors (live subscriptions) ---
def monitor_image(self, callback: Callable[[np.ndarray, int], None]) -> None: def monitor_image(self, callback: Callable[[np.ndarray], None]) -> None:
""" """
Subscribe to live image updates. Subscribe to live image updates.
The callback receives (image_array, frame_count) for each new frame. The callback receives a 2D numpy array for each new frame.
Only one image monitor can be active at a time. Only one image monitor can be active at a time.
Args: Args:
callback: Function called with (numpy array, frame_count) for each frame. callback: Function called with numpy array for each frame.
""" """
self.stop_monitor("IMAGE") self.stop_monitor("IMAGE")
@@ -231,12 +337,10 @@ class Client:
def _on_image(value): def _on_image(value):
try: try:
arr = value.value arr = value.value
# NTNDArray stores shape info in the dimension field
if hasattr(value, 'dimension') and value.dimension: if hasattr(value, 'dimension') and value.dimension:
dims = value.dimension shape = tuple(d.size for d in value.dimension)
shape = tuple(d.size for d in dims)
arr = arr.reshape(shape) arr = arr.reshape(shape)
callback(arr, 0) callback(arr)
except Exception as e: except Exception as e:
logger.debug("Image monitor error: %s", e) logger.debug("Image monitor error: %s", e)
@@ -277,20 +381,25 @@ class Client:
self._subscriptions.clear() self._subscriptions.clear()
def close(self) -> None: def close(self) -> None:
"""Close the client and release resources.""" """Close the client and release all resources."""
self.stop_all_monitors() self.stop_all_monitors()
self._ctx.close() self._ctx.close()
self._zmq_ctx.term()
# Example usage # Example usage
if __name__ == "__main__": if __name__ == "__main__":
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
client = Client() client = Client(data_host="detector-machine")
try: try:
print("Status:", client.get_status()) print("Status:", client.get_status())
print("Acquiring:", client.is_acquiring())
print("Frame rate:", client.get_frame_rate()) # One-shot acquisition of 10 frames
client.set_exposure_mode(ExposureModes.GLOBAL_SHUTTER_CDS)
client.set_integration_time(6.0)
frames = client.acquire_frames(10)
print(f"Got {frames.shape[0]} frames, shape={frames.shape}")
except CommandError as e: except CommandError as e:
print(f"Error: {e}") print(f"Error: {e}")
finally: finally:

View File

@@ -1 +1 @@
from .writer import FileWriter from .capture import FrameCapture, DataTransferServer

View File

@@ -0,0 +1,264 @@
"""
On-demand frame capture and ZMQ data transfer.
Instead of continuous local file writing, this module captures a requested
number of frames from the shared memory ring buffer into a temp file, then
serves them to remote clients via ZMQ. This is designed for the common case
where the server runs on a dedicated detector machine and clients need to
retrieve data over a (slower) network.
Flow:
1. Client triggers capture via PVA: pvput TEAM1K:CMD:CAPTURE <num_frames>
2. FrameCapture thread reads N frames from shmring → temp file
3. Client polls TEAM1K:CAPTURE:STATUS via PVA until "READY"
4. Client fetches data via ZMQ: sends b"FETCH", receives [header, data]
"""
import json
import logging
import tempfile
import threading
from pathlib import Path
import numpy as np
import zmq
from ..detector.chip_config import ChipConfig, TEAM1K_CONFIG
logger = logging.getLogger(__name__)
DEFAULT_ZMQ_PORT = 42005
class FrameCapture(threading.Thread):
"""
Captures frames from shmring on demand.
When triggered via capture(), reads N consecutive frames from the ring
buffer and writes them to a temp file. Status is tracked and exposed
for the PVA interface.
"""
def __init__(self, ring_name: str, chip_config: ChipConfig = TEAM1K_CONFIG):
super().__init__(daemon=True, name="team1k-capture")
self._ring_name = ring_name
self._config = chip_config
self._shutdown = threading.Event()
# Capture state
self._capture_event = threading.Event()
self._capture_done = threading.Event()
self._num_requested = 0
self._num_captured = 0
self._status = "IDLE"
self._error_msg = ""
self._temp_path: str | None = None
self._data_lock = threading.Lock()
@property
def status(self) -> str:
return self._status
@property
def num_captured(self) -> int:
return self._num_captured
@property
def num_requested(self) -> int:
return self._num_requested
@property
def error_message(self) -> str:
return self._error_msg
def capture(self, num_frames: int) -> None:
"""Request capture of N frames from the ring buffer."""
self._num_requested = num_frames
self._num_captured = 0
self._status = "CAPTURING"
self._error_msg = ""
self._capture_done.clear()
self._capture_event.set()
logger.info("Capture requested: %d frames", num_frames)
def wait_complete(self, timeout: float | None = None) -> bool:
"""Wait for current capture to complete. Returns True if complete."""
return self._capture_done.wait(timeout)
def get_data(self) -> tuple[dict, bytes] | None:
"""
Read captured data. Returns (header_dict, raw_bytes) or None.
The caller should hold this briefly — a new capture will block
until this returns.
"""
with self._data_lock:
if self._status != "READY" or not self._temp_path:
return None
header = {
"num_frames": self._num_captured,
"nx": self._config.image_nx,
"ny": self._config.image_ny,
"dtype": "uint16",
}
try:
data = Path(self._temp_path).read_bytes()
except Exception as e:
logger.error("Failed to read capture file: %s", e)
return None
return header, data
def run(self) -> None:
"""Main thread loop: wait for capture requests."""
from locking_shmring import RingBufferReader
reader = RingBufferReader(self._ring_name)
logger.info("Frame capture connected to ring buffer '%s'", self._ring_name)
try:
while not self._shutdown.is_set():
if not self._capture_event.wait(0.2):
continue
self._capture_event.clear()
self._do_capture(reader)
except Exception as e:
logger.error("Frame capture thread error: %s", e)
finally:
reader.close()
def _do_capture(self, reader) -> None:
"""Capture frames to a temp file."""
# Clean up previous temp file (under lock so concurrent reads finish)
with self._data_lock:
if self._temp_path:
try:
Path(self._temp_path).unlink(missing_ok=True)
except Exception:
pass
self._temp_path = None
try:
fd, temp_path = tempfile.mkstemp(suffix='.raw', prefix='team1k_capture_')
with open(fd, 'wb') as f:
for slot in reader.iter_frames(timeout_ms=5000):
if self._shutdown.is_set():
self._status = "ERROR"
self._error_msg = "Shutdown during capture"
self._capture_done.set()
return
with slot:
frame = slot.data_as_numpy(
(self._config.image_ny, self._config.image_nx),
np.uint16,
)
f.write(frame.tobytes())
self._num_captured += 1
if self._num_captured >= self._num_requested:
break
if self._num_captured < self._num_requested:
self._status = "ERROR"
self._error_msg = (
f"Only captured {self._num_captured}/{self._num_requested} "
f"frames (ring timeout)"
)
logger.error(self._error_msg)
else:
with self._data_lock:
self._temp_path = temp_path
self._status = "READY"
logger.info("Capture complete: %d frames (%s)",
self._num_captured, temp_path)
except Exception as e:
self._status = "ERROR"
self._error_msg = str(e)
logger.error("Capture error: %s", e)
self._capture_done.set()
def stop(self) -> None:
"""Stop the capture thread and clean up."""
self._shutdown.set()
if self._temp_path:
try:
Path(self._temp_path).unlink(missing_ok=True)
except Exception:
pass
class DataTransferServer(threading.Thread):
"""
ZMQ server for transferring captured frame data to clients.
Runs a REP socket. Supports two request types:
b"FETCH" — returns [header_json, frame_data_bytes]
b"STATUS" — returns JSON with capture status
"""
def __init__(self, capture: FrameCapture, chip_config: ChipConfig = TEAM1K_CONFIG,
port: int = DEFAULT_ZMQ_PORT):
super().__init__(daemon=True, name="team1k-data-transfer")
self._capture = capture
self._config = chip_config
self._port = port
self._shutdown = threading.Event()
def run(self) -> None:
ctx = zmq.Context()
sock = ctx.socket(zmq.REP)
sock.bind(f"tcp://*:{self._port}")
sock.setsockopt(zmq.RCVTIMEO, 200)
logger.info("Data transfer server listening on tcp://*:%d", self._port)
try:
while not self._shutdown.is_set():
try:
msg = sock.recv()
except zmq.Again:
continue
self._handle(sock, msg)
finally:
sock.close()
ctx.term()
def _handle(self, sock: zmq.Socket, msg: bytes) -> None:
try:
request = msg.decode('utf-8').strip()
except UnicodeDecodeError:
sock.send(json.dumps({"error": "Invalid request"}).encode())
return
if request == "FETCH":
self._handle_fetch(sock)
elif request == "STATUS":
sock.send(json.dumps({
"status": self._capture.status,
"num_captured": self._capture.num_captured,
"num_requested": self._capture.num_requested,
"error": self._capture.error_message,
}).encode())
else:
sock.send(json.dumps({"error": f"Unknown request: {request}"}).encode())
def _handle_fetch(self, sock: zmq.Socket) -> None:
result = self._capture.get_data()
if result is None:
sock.send_multipart([
json.dumps({
"error": "No data available",
"status": self._capture.status,
}).encode(),
b"",
])
return
header, data = result
sock.send_multipart([json.dumps(header).encode(), data])
logger.info("Sent %d frames (%d bytes) to client",
header["num_frames"], len(data))
def stop(self) -> None:
self._shutdown.set()

View File

@@ -1,317 +0,0 @@
"""
File writing from shared memory ring buffer.
Ports KTestFunctions::FileWritingThread from KTestFunctions.cxx (lines 1198-1241).
Supports both raw binary (backward compatible) and HDF5 formats.
"""
import os
import struct
import shutil
import logging
import threading
from pathlib import Path
import numpy as np
from ..detector.chip_config import ChipConfig, TEAM1K_CONFIG
logger = logging.getLogger(__name__)
# Minimum free disk space in MB before stopping writes
_MIN_FREE_SPACE_MB = 256
# Scan number persistence file
_SCAN_NUMBER_FILE = "/usr/local/k_test/parameters/last_scan_number.dat"
class FileWriter:
"""
Reads frames from locking_shmring and writes to disk.
Supports two formats:
- 'raw': Binary files matching the C++ k_test format (4-byte header + pixel data)
- 'hdf5': HDF5 files with chunked datasets and metadata
Runs as a background thread in the main process.
"""
def __init__(self, ring_name: str, chip_config: ChipConfig = TEAM1K_CONFIG):
self._ring_name = ring_name
self._config = chip_config
self._thread: threading.Thread | None = None
self._running = False
# Configuration (can be changed while running)
self._enabled = threading.Event()
self._output_dir = "/data"
self._format = "raw" # or "hdf5"
self._scan_number = self._load_scan_number()
self._lock = threading.Lock()
# Stats
self.frames_written: int = 0
self.bytes_written: int = 0
def _load_scan_number(self) -> int:
"""Load the last scan number from the persistent file."""
try:
with open(_SCAN_NUMBER_FILE, 'rb') as f:
data = f.read(4)
if len(data) == 4:
return struct.unpack('<I', data)[0]
except FileNotFoundError:
pass
except Exception as e:
logger.warning("Could not read scan number file: %s", e)
return 0
def _save_scan_number(self) -> None:
"""Save the current scan number to the persistent file."""
try:
os.makedirs(os.path.dirname(_SCAN_NUMBER_FILE), exist_ok=True)
with open(_SCAN_NUMBER_FILE, 'wb') as f:
f.write(struct.pack('<I', self._scan_number))
except Exception as e:
logger.warning("Could not save scan number file: %s", e)
@property
def scan_number(self) -> int:
return self._scan_number
@property
def output_dir(self) -> str:
return self._output_dir
@property
def file_format(self) -> str:
return self._format
@property
def is_enabled(self) -> bool:
return self._enabled.is_set()
def enable(self, output_dir: str | None = None, file_format: str | None = None) -> None:
"""
Enable file writing.
Args:
output_dir: Output directory (default: keep current).
file_format: File format, 'raw' or 'hdf5' (default: keep current).
"""
with self._lock:
if output_dir is not None:
self._output_dir = output_dir
if file_format is not None:
if file_format not in ('raw', 'hdf5'):
raise ValueError(f"Unsupported format: {file_format}")
self._format = file_format
# Increment scan number
self._scan_number += 1
self._save_scan_number()
# Create scan directory
scan_dir = os.path.join(self._output_dir,
f"scan_{self._scan_number:010d}")
os.makedirs(scan_dir, exist_ok=True)
self._enabled.set()
logger.info("File writing enabled: dir=%s, format=%s, scan=%d",
self._output_dir, self._format, self._scan_number)
def disable(self) -> None:
"""Disable file writing."""
self._enabled.clear()
logger.info("File writing disabled")
def start(self) -> None:
"""Start the writer thread."""
if self._thread and self._thread.is_alive():
return
self._running = True
self._thread = threading.Thread(target=self._writer_loop, daemon=True,
name="team1k-filewriter")
self._thread.start()
logger.info("File writer thread started")
def stop(self) -> None:
"""Stop the writer thread."""
self._running = False
self._enabled.clear()
if self._thread:
self._thread.join(timeout=5.0)
logger.info("File writer thread stopped")
def _check_disk_space(self) -> bool:
"""Check for sufficient free disk space."""
try:
stat = shutil.disk_usage(self._output_dir)
free_mb = stat.free / (1024 * 1024)
if free_mb < _MIN_FREE_SPACE_MB:
logger.error("Disk almost full (%.0f MB free), stopping writes", free_mb)
return False
return True
except Exception as e:
logger.error("Could not check disk space: %s", e)
return False
def _writer_loop(self) -> None:
"""Main writer loop reading from shmring."""
from locking_shmring import RingBufferReader
reader = RingBufferReader(self._ring_name)
logger.info("File writer connected to ring buffer '%s'", self._ring_name)
raw_fp = None
h5_file = None
h5_dataset = None
file_number = 0
transfers_in_file = 0
ntransfers_per_file = self._config.ntransfers_per_file
current_scan = 0
try:
for slot in reader.iter_frames(timeout_ms=200):
if not self._running:
slot.release()
break
with slot:
if not self._enabled.is_set():
# Close any open files
if raw_fp:
raw_fp.close()
raw_fp = None
if h5_file:
h5_file.close()
h5_file = None
h5_dataset = None
file_number = 0
transfers_in_file = 0
continue
# Check if scan number changed (new enable call)
with self._lock:
scan = self._scan_number
fmt = self._format
out_dir = self._output_dir
if scan != current_scan:
# Close old files
if raw_fp:
raw_fp.close()
raw_fp = None
if h5_file:
h5_file.close()
h5_file = None
h5_dataset = None
file_number = 0
transfers_in_file = 0
current_scan = scan
# Check disk space periodically
if transfers_in_file == 0 and not self._check_disk_space():
self._enabled.clear()
continue
frame = slot.data_as_numpy(
(self._config.image_ny, self._config.image_nx), np.uint16
)
if fmt == "raw":
raw_fp, file_number, transfers_in_file = self._write_raw(
frame, slot.frame_id, raw_fp, out_dir, scan,
file_number, transfers_in_file, ntransfers_per_file
)
elif fmt == "hdf5":
h5_file, h5_dataset = self._write_hdf5(
frame, slot.frame_id, slot.timestamp_ns,
h5_file, h5_dataset, out_dir, scan, file_number
)
self.frames_written += 1
self.bytes_written += self._config.image_size_bytes
except Exception as e:
logger.error("File writer error: %s", e)
finally:
if raw_fp:
raw_fp.close()
if h5_file:
h5_file.close()
reader.close()
def _write_raw(self, frame: np.ndarray, frame_id: int,
fp, out_dir: str, scan: int,
file_number: int, transfers_in_file: int,
ntransfers_per_file: int):
"""Write a frame in raw binary format (backward compatible with C++ k_test)."""
if transfers_in_file == 0:
if fp:
fp.close()
filename = os.path.join(
out_dir,
f"scan_{scan:010d}",
f"team1k_ued_{scan:010d}_{file_number:010d}.dat"
)
fp = open(filename, 'wb')
# Write first image number header
fp.write(struct.pack('<I', frame_id))
fp.write(frame.tobytes())
fp.flush()
transfers_in_file += 1
if transfers_in_file >= ntransfers_per_file:
transfers_in_file = 0
file_number += 1
return fp, file_number, transfers_in_file
def _write_hdf5(self, frame: np.ndarray, frame_id: int, timestamp_ns: int,
h5_file, h5_dataset, out_dir: str, scan: int, file_number: int):
"""Write a frame in HDF5 format."""
try:
import h5py
except ImportError:
logger.error("h5py not installed, cannot write HDF5 files")
self._enabled.clear()
return None, None
if h5_file is None:
filename = os.path.join(
out_dir,
f"scan_{scan:010d}",
f"team1k_ued_{scan:010d}_{file_number:010d}.h5"
)
h5_file = h5py.File(filename, 'w')
h5_dataset = h5_file.create_dataset(
'data',
shape=(0, self._config.image_ny, self._config.image_nx),
maxshape=(None, self._config.image_ny, self._config.image_nx),
dtype=np.uint16,
chunks=(1, self._config.image_ny, self._config.image_nx),
)
h5_file.create_dataset('timestamps', shape=(0,), maxshape=(None,),
dtype=np.float64)
h5_file.create_dataset('frame_ids', shape=(0,), maxshape=(None,),
dtype=np.uint64)
h5_file.attrs['scan_number'] = scan
# Append frame
n = h5_dataset.shape[0]
h5_dataset.resize(n + 1, axis=0)
h5_dataset[n] = frame
ts_ds = h5_file['timestamps']
ts_ds.resize(n + 1, axis=0)
ts_ds[n] = timestamp_ns / 1e9
fid_ds = h5_file['frame_ids']
fid_ds.resize(n + 1, axis=0)
fid_ds[n] = frame_id
h5_file.flush()
return h5_file, h5_dataset

View File

@@ -69,25 +69,21 @@ class PVAInterface:
{prefix}EXPOSURE_MODE - int, current exposure mode {prefix}EXPOSURE_MODE - int, current exposure mode
{prefix}TRIGGER_MODE - int, current trigger mode {prefix}TRIGGER_MODE - int, current trigger mode
{prefix}INTEGRATION_TIME - float, integration time ms {prefix}INTEGRATION_TIME - float, integration time ms
{prefix}FILE_WRITING - bool, file writing enabled {prefix}CAPTURE:STATUS - string, capture state (IDLE/CAPTURING/READY/ERROR)
{prefix}OUTPUT_DIR - string, output directory {prefix}CAPTURE:PROGRESS - int, frames captured so far
{prefix}SCAN_NUMBER - int, current scan number {prefix}CAPTURE:TOTAL - int, frames requested
Commands (writable): Commands (writable):
{prefix}CMD:EXPOSURE_MODE - int, set exposure mode {prefix}CMD:EXPOSURE_MODE - int, set exposure mode
{prefix}CMD:TRIGGER_MODE - int, set trigger mode {prefix}CMD:TRIGGER_MODE - int, set trigger mode
{prefix}CMD:INTEGRATION_TIME - float, set integration time ms {prefix}CMD:INTEGRATION_TIME - float, set integration time ms
{prefix}CMD:START_DAQ - int, 1=start, 0=stop {prefix}CMD:START_DAQ - int, 1=start, 0=stop
{prefix}CMD:FILE_WRITING - int, 1=enable, 0=disable {prefix}CMD:CAPTURE - int, capture N frames
{prefix}CMD:OUTPUT_DIR - string, set output directory
{prefix}CMD:FILE_FORMAT - string, "raw" or "hdf5"
{prefix}CMD:ADC_CLOCK_FREQ - float, set ADC clock MHz {prefix}CMD:ADC_CLOCK_FREQ - float, set ADC clock MHz
{prefix}CMD:ADC_DATA_DELAY - int, set ADC data delay {prefix}CMD:ADC_DATA_DELAY - int, set ADC data delay
{prefix}CMD:PARAMETER_FILE - string, apply parameter file {prefix}CMD:PARAMETER_FILE - string, apply parameter file
{prefix}CMD:RESET - int, 1=reset detector connection {prefix}CMD:RESET - int, 1=reset detector connection
{prefix}CMD:TEST_MODE - int, 0/1 {prefix}CMD:TEST_MODE - int, 0/1
{prefix}CMD:REGISTER_READ - int, read register at address
{prefix}CMD:REGISTER_WRITE - int array [address, value]
Data: Data:
{prefix}IMAGE - NTNDArray, live image stream {prefix}IMAGE - NTNDArray, live image stream
@@ -140,12 +136,12 @@ class PVAInterface:
NTScalar('i'), 0) NTScalar('i'), 0)
self._pvs[f"{p}INTEGRATION_TIME"] = self._make_status_pv( self._pvs[f"{p}INTEGRATION_TIME"] = self._make_status_pv(
NTScalar('d'), 0.0) NTScalar('d'), 0.0)
self._pvs[f"{p}FILE_WRITING"] = self._make_status_pv( self._pvs[f"{p}CAPTURE:STATUS"] = self._make_status_pv(
NTScalar('?'), False) NTScalar('s'), "IDLE")
self._pvs[f"{p}OUTPUT_DIR"] = self._make_status_pv( self._pvs[f"{p}CAPTURE:PROGRESS"] = self._make_status_pv(
NTScalar('s'), "/data") NTScalar('i'), 0)
self._pvs[f"{p}SCAN_NUMBER"] = self._make_status_pv( self._pvs[f"{p}CAPTURE:TOTAL"] = self._make_status_pv(
NTScalar('l'), 0) NTScalar('i'), 0)
# --- Command PVs (writable) --- # --- Command PVs (writable) ---
self._pvs[f"{p}CMD:EXPOSURE_MODE"] = self._make_command_pv( self._pvs[f"{p}CMD:EXPOSURE_MODE"] = self._make_command_pv(
@@ -156,12 +152,8 @@ class PVAInterface:
NTScalar('d'), 0.0, "set_integration_time") NTScalar('d'), 0.0, "set_integration_time")
self._pvs[f"{p}CMD:START_DAQ"] = self._make_command_pv( self._pvs[f"{p}CMD:START_DAQ"] = self._make_command_pv(
NTScalar('i'), 0, "start_stop_daq") NTScalar('i'), 0, "start_stop_daq")
self._pvs[f"{p}CMD:FILE_WRITING"] = self._make_command_pv( self._pvs[f"{p}CMD:CAPTURE"] = self._make_command_pv(
NTScalar('i'), 0, "set_file_writing") NTScalar('i'), 0, "capture_frames")
self._pvs[f"{p}CMD:OUTPUT_DIR"] = self._make_command_pv(
NTScalar('s'), "", "set_output_dir")
self._pvs[f"{p}CMD:FILE_FORMAT"] = self._make_command_pv(
NTScalar('s'), "raw", "set_file_format")
self._pvs[f"{p}CMD:ADC_CLOCK_FREQ"] = self._make_command_pv( self._pvs[f"{p}CMD:ADC_CLOCK_FREQ"] = self._make_command_pv(
NTScalar('d'), 60.0, "set_adc_clock_freq") NTScalar('d'), 60.0, "set_adc_clock_freq")
self._pvs[f"{p}CMD:ADC_DATA_DELAY"] = self._make_command_pv( self._pvs[f"{p}CMD:ADC_DATA_DELAY"] = self._make_command_pv(

View File

@@ -11,7 +11,8 @@ Architecture:
- Detector register read/write (UDP port 42000) - Detector register read/write (UDP port 42000)
- PVA server (commands, status, data) - PVA server (commands, status, data)
- PVA streamer thread (shmring -> NTNDArray) - PVA streamer thread (shmring -> NTNDArray)
- File writer thread (shmring -> raw/HDF5 files) - Frame capture thread (shmring -> temp file, on demand)
- ZMQ data transfer server (temp file -> client)
- Peripheral control (bellow stage, power supply) - Peripheral control (bellow stage, power supply)
Acquisition subprocess: Acquisition subprocess:
@@ -35,7 +36,7 @@ from .detector.chip_config import TEAM1K_CONFIG, configure_chip
from .detector.parameter_file import apply_parameter_file from .detector.parameter_file import apply_parameter_file
from .acquisition.receiver import AcquisitionProcess from .acquisition.receiver import AcquisitionProcess
from .filewriter.writer import FileWriter from .filewriter.capture import FrameCapture, DataTransferServer
from .pva.interface import PVAInterface from .pva.interface import PVAInterface
from .pva.streamer import PVAStreamer from .pva.streamer import PVAStreamer
from .peripherals.bellow_stage import BellowStage from .peripherals.bellow_stage import BellowStage
@@ -49,6 +50,7 @@ DEFAULT_REGISTER_PORT = 42000
DEFAULT_DATA_PORT = 41000 DEFAULT_DATA_PORT = 41000
DEFAULT_PV_PREFIX = "TEAM1K:" DEFAULT_PV_PREFIX = "TEAM1K:"
DEFAULT_BELLOW_PORT = "/dev/CameraBellowStage" DEFAULT_BELLOW_PORT = "/dev/CameraBellowStage"
DEFAULT_ZMQ_PORT = 42005
class Team1kServer: class Team1kServer:
@@ -56,7 +58,8 @@ class Team1kServer:
Main server coordinating all subsystems. Main server coordinating all subsystems.
Manages direct UDP communication with the detector, data acquisition Manages direct UDP communication with the detector, data acquisition
via a dedicated subprocess, PVA streaming, and optional file writing. via a dedicated subprocess, PVA streaming, and on-demand frame capture
with ZMQ data transfer to remote clients.
""" """
def __init__(self, detector_ip: str = DEFAULT_DETECTOR_IP, def __init__(self, detector_ip: str = DEFAULT_DETECTOR_IP,
@@ -64,7 +67,8 @@ class Team1kServer:
data_port: int = DEFAULT_DATA_PORT, data_port: int = DEFAULT_DATA_PORT,
pv_prefix: str = DEFAULT_PV_PREFIX, pv_prefix: str = DEFAULT_PV_PREFIX,
config_file: str | None = None, config_file: str | None = None,
bellow_port: str = DEFAULT_BELLOW_PORT): bellow_port: str = DEFAULT_BELLOW_PORT,
zmq_port: int = DEFAULT_ZMQ_PORT):
self.detector_ip = detector_ip self.detector_ip = detector_ip
self.register_port = register_port self.register_port = register_port
@@ -90,8 +94,11 @@ class Team1kServer:
# PVA interface # PVA interface
self.pva = PVAInterface(self, prefix=pv_prefix) self.pva = PVAInterface(self, prefix=pv_prefix)
# File writer (thread in main process) # Frame capture + ZMQ data transfer (replaces local file writing)
self.file_writer = FileWriter("team1k_frames", self.chip_config) self.frame_capture = FrameCapture("team1k_frames", self.chip_config)
self.data_server = DataTransferServer(
self.frame_capture, self.chip_config, port=zmq_port,
)
# PVA streamer will be created after PVA setup # PVA streamer will be created after PVA setup
self._pva_streamer: PVAStreamer | None = None self._pva_streamer: PVAStreamer | None = None
@@ -173,24 +180,14 @@ class Team1kServer:
else: else:
self._stop_daq() self._stop_daq()
elif command == "set_file_writing": elif command == "capture_frames":
if int(value): num = int(value)
self.file_writer.enable() if num <= 0:
self.pva.post_status("FILE_WRITING", True) raise ValueError(f"Invalid frame count: {num}")
self.pva.post_status("SCAN_NUMBER", self.file_writer.scan_number) self.frame_capture.capture(num)
else: self.pva.post_status("CAPTURE:STATUS", "CAPTURING")
self.file_writer.disable() self.pva.post_status("CAPTURE:TOTAL", num)
self.pva.post_status("FILE_WRITING", False) self.pva.post_status("CAPTURE:PROGRESS", 0)
elif command == "set_output_dir":
self.file_writer._output_dir = str(value)
self.pva.post_status("OUTPUT_DIR", str(value))
elif command == "set_file_format":
fmt = str(value)
if fmt not in ("raw", "hdf5"):
raise ValueError(f"Unsupported format: {fmt}")
self.file_writer._format = fmt
elif command == "set_adc_clock_freq": elif command == "set_adc_clock_freq":
self.adc.set_clock_freq(float(value)) self.adc.set_clock_freq(float(value))
@@ -293,9 +290,8 @@ class Team1kServer:
self.pva.post_status("FRAME_RATE", self._pva_streamer.frame_rate) self.pva.post_status("FRAME_RATE", self._pva_streamer.frame_rate)
self.pva.post_status("FRAME_COUNT", self._pva_streamer.frame_count) self.pva.post_status("FRAME_COUNT", self._pva_streamer.frame_count)
self.pva.post_status("FILE_WRITING", self.file_writer.is_enabled) self.pva.post_status("CAPTURE:STATUS", self.frame_capture.status)
self.pva.post_status("SCAN_NUMBER", self.file_writer.scan_number) self.pva.post_status("CAPTURE:PROGRESS", self.frame_capture.num_captured)
self.pva.post_status("OUTPUT_DIR", self.file_writer.output_dir)
except Exception as e: except Exception as e:
logger.debug("Status update error: %s", e) logger.debug("Status update error: %s", e)
@@ -329,8 +325,9 @@ class Team1kServer:
) )
self._pva_streamer.start() self._pva_streamer.start()
# Start file writer thread # Start frame capture + ZMQ data transfer threads
self.file_writer.start() self.frame_capture.start()
self.data_server.start()
# Start status update thread # Start status update thread
status_thread = threading.Thread(target=self._status_update_loop, status_thread = threading.Thread(target=self._status_update_loop,
@@ -370,8 +367,9 @@ class Team1kServer:
if self._pva_streamer: if self._pva_streamer:
self._pva_streamer.stop() self._pva_streamer.stop()
# Stop file writer # Stop frame capture + data transfer
self.file_writer.stop() self.frame_capture.stop()
self.data_server.stop()
# Shutdown acquisition subprocess # Shutdown acquisition subprocess
self.acquisition.shutdown() self.acquisition.shutdown()
@@ -400,6 +398,8 @@ def main():
help="Parameter file to apply on startup") help="Parameter file to apply on startup")
parser.add_argument('--bellow-port', default=DEFAULT_BELLOW_PORT, parser.add_argument('--bellow-port', default=DEFAULT_BELLOW_PORT,
help=f"Bellow stage serial port (default: {DEFAULT_BELLOW_PORT})") help=f"Bellow stage serial port (default: {DEFAULT_BELLOW_PORT})")
parser.add_argument('--zmq-port', type=int, default=DEFAULT_ZMQ_PORT,
help=f"ZMQ data transfer port (default: {DEFAULT_ZMQ_PORT})")
parser.add_argument('--log-level', default='INFO', parser.add_argument('--log-level', default='INFO',
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
help="Logging level (default: INFO)") help="Logging level (default: INFO)")
@@ -417,6 +417,7 @@ def main():
pv_prefix=args.pv_prefix, pv_prefix=args.pv_prefix,
config_file=args.config, config_file=args.config,
bellow_port=args.bellow_port, bellow_port=args.bellow_port,
zmq_port=args.zmq_port,
) )
try: try: