Claude v1

This commit is contained in:
2026-02-27 01:14:08 -06:00
parent d7e8a093e0
commit 08135c6c13
23 changed files with 2989 additions and 1909 deletions

View File

@@ -14,10 +14,12 @@ requires-python = ">=3.9"
dependencies = [ dependencies = [
"numpy >= 1.24.0", "numpy >= 1.24.0",
"pyserial >= 3.5", "pyserial >= 3.5",
"pydantic >= 2.0.0", "p4p >= 4.1.0",
"typing-extensions >= 4.5.0",
] ]
[project.optional-dependencies]
hdf5 = ["h5py >= 3.0.0"]
[project.scripts] [project.scripts]
team1k-server = "team1k.server:main" team1k-server = "team1k.server:main"

View File

@@ -1,598 +1,297 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Team1k Client - Interface to the Team1k detector control server. Team1k Client — PVA-based interface to the Team1k detector server.
Replaces the previous ZMQ-based client with EPICS PV Access communication.
All commands are sent via pvput to command PVs, and status/data are
received via pvget or monitor subscriptions.
""" """
import uuid import enum
import time import logging
import socket
import asyncio
import threading import threading
from typing import Dict, List, Any, Optional, Tuple, Union, Callable from typing import Any, Callable
import zmq import numpy as np
import zmq.asyncio
from .KTestClient import KTestClient, KTestCommand, KTestError, ExposureModes, TriggerModes from p4p.client.thread import Context, Subscription
logger = logging.getLogger(__name__)
class CommandException(Exception): class ExposureModes(enum.IntEnum):
"""Exception raised when a command fails on the server side.""" """Detector exposure modes."""
ROLLING_SHUTTER = 0
GLOBAL_SHUTTER = 1
GLOBAL_SHUTTER_FLUSH = 2
GLOBAL_SHUTTER_CDS = 3
class TriggerModes(enum.IntEnum):
"""Detector trigger modes."""
INTERNAL = 0
EXTERNAL = 1
class CommandError(Exception):
"""Raised when a PVA command put fails."""
pass pass
class Client: class Client:
""" """
Client for the Team1k server. PVA client for the Team1k detector server.
Provides methods to send commands and receive status updates. All communication uses EPICS PV Access:
- Commands: pvput to {prefix}CMD:* PVs
- Status: pvget from {prefix}* status PVs
- Data: monitor on {prefix}IMAGE PV
Args:
prefix: PV name prefix (default: "TEAM1K:").
timeout: Default timeout in seconds for PVA operations.
""" """
def __init__(self, host: str = 'localhost', cmd_port: int = 42003, pub_port: int = 42004): def __init__(self, prefix: str = "TEAM1K:", timeout: float = 5.0):
self._prefix = prefix
self._timeout = timeout
self._ctx = Context("pva")
self._subscriptions: dict[str, Subscription] = {}
def _pv(self, name: str) -> str:
"""Build full PV name."""
return f"{self._prefix}{name}"
def _put(self, pv_suffix: str, value: Any) -> None:
"""Put a value to a command PV."""
pv_name = self._pv(pv_suffix)
try:
self._ctx.put(pv_name, value, timeout=self._timeout)
except Exception as e:
raise CommandError(f"Failed to put {pv_name}={value}: {e}") from e
def _get(self, pv_suffix: str) -> Any:
"""Get the current value of a PV."""
pv_name = self._pv(pv_suffix)
try:
return self._ctx.get(pv_name, timeout=self._timeout)
except Exception as e:
raise CommandError(f"Failed to get {pv_name}: {e}") from e
# --- Detector commands ---
def set_exposure_mode(self, mode: ExposureModes | int) -> None:
"""Set the detector exposure mode (0-3)."""
self._put("CMD:EXPOSURE_MODE", int(mode))
def set_trigger_mode(self, mode: TriggerModes | int) -> None:
"""Set the trigger mode (0=internal, 1=external)."""
self._put("CMD:TRIGGER_MODE", int(mode))
def set_integration_time(self, time_ms: float) -> None:
"""Set the integration time in milliseconds."""
self._put("CMD:INTEGRATION_TIME", time_ms)
def start_daq(self) -> None:
"""Start data acquisition."""
self._put("CMD:START_DAQ", 1)
def stop_daq(self) -> None:
"""Stop data acquisition."""
self._put("CMD:START_DAQ", 0)
def set_file_writing(self, enable: bool) -> None:
"""Enable or disable file writing."""
self._put("CMD:FILE_WRITING", 1 if enable else 0)
def set_output_dir(self, path: str) -> None:
"""Set the output directory for file writing."""
self._put("CMD:OUTPUT_DIR", path)
def set_file_format(self, fmt: str) -> None:
"""Set file format: 'raw' or 'hdf5'."""
self._put("CMD:FILE_FORMAT", fmt)
def set_adc_clock_freq(self, freq_mhz: float) -> None:
"""Set ADC clock frequency in MHz (50-100)."""
self._put("CMD:ADC_CLOCK_FREQ", freq_mhz)
def set_adc_data_delay(self, delay: int) -> None:
"""Set ADC data delay."""
self._put("CMD:ADC_DATA_DELAY", delay)
def load_parameter_file(self, file_path: str) -> None:
"""Apply a parameter file on the server."""
self._put("CMD:PARAMETER_FILE", file_path)
def reset_connection(self) -> None:
"""Reset the detector connection (stop DAQ, reinitialize)."""
self._put("CMD:RESET", 1)
def set_test_mode(self, enable: bool) -> None:
"""Enable or disable FPGA test data generation."""
self._put("CMD:TEST_MODE", 1 if enable else 0)
# --- Peripheral commands ---
def insert_detector(self) -> None:
"""Insert the detector (move bellow stage)."""
self._put("BELLOW:CMD", 1)
def retract_detector(self) -> None:
"""Retract the detector (move bellow stage)."""
self._put("BELLOW:CMD", 0)
def power_on(self) -> None:
"""Turn on the detector power supply."""
self._put("POWER:CMD", 1)
def power_off(self) -> None:
"""Turn off the detector power supply."""
self._put("POWER:CMD", 0)
# --- Status getters ---
def get_status(self) -> list[str]:
"""Get server status strings."""
val = self._get("STATUS")
return list(val.value) if hasattr(val, 'value') else list(val)
def is_acquiring(self) -> bool:
"""Check if DAQ is running."""
val = self._get("ACQUIRING")
return bool(val.value if hasattr(val, 'value') else val)
def get_frame_rate(self) -> float:
"""Get current frame rate in Hz."""
val = self._get("FRAME_RATE")
return float(val.value if hasattr(val, 'value') else val)
def get_frame_count(self) -> int:
"""Get total frames acquired."""
val = self._get("FRAME_COUNT")
return int(val.value if hasattr(val, 'value') else val)
def get_exposure_mode(self) -> int:
"""Get current exposure mode."""
val = self._get("EXPOSURE_MODE")
return int(val.value if hasattr(val, 'value') else val)
def get_trigger_mode(self) -> int:
"""Get current trigger mode."""
val = self._get("TRIGGER_MODE")
return int(val.value if hasattr(val, 'value') else val)
def get_integration_time(self) -> float:
"""Get current integration time in ms."""
val = self._get("INTEGRATION_TIME")
return float(val.value if hasattr(val, 'value') else val)
def is_file_writing(self) -> bool:
"""Check if file writing is enabled."""
val = self._get("FILE_WRITING")
return bool(val.value if hasattr(val, 'value') else val)
def get_output_dir(self) -> str:
"""Get current output directory."""
val = self._get("OUTPUT_DIR")
return str(val.value if hasattr(val, 'value') else val)
def get_scan_number(self) -> int:
"""Get current scan number."""
val = self._get("SCAN_NUMBER")
return int(val.value if hasattr(val, 'value') else val)
def get_bellow_position(self) -> int:
"""Get bellow stage position."""
val = self._get("BELLOW:POSITION")
return int(val.value if hasattr(val, 'value') else val)
def get_power_voltages(self) -> np.ndarray:
"""Get power supply voltage readings."""
val = self._get("POWER:VOLTAGES")
return np.asarray(val.value if hasattr(val, 'value') else val)
# --- Monitors (live subscriptions) ---
def monitor_image(self, callback: Callable[[np.ndarray, int], None]) -> None:
""" """
Initialize the Team1k client. Subscribe to live image updates.
The callback receives (image_array, frame_count) for each new frame.
Only one image monitor can be active at a time.
Args: Args:
host: The hostname of the server callback: Function called with (numpy array, frame_count) for each frame.
cmd_port: The command socket port
pub_port: The status publication port
""" """
self.host = host self.stop_monitor("IMAGE")
self.cmd_port = cmd_port
self.pub_port = pub_port pv_name = self._pv("IMAGE")
# Set up ZMQ for commands def _on_image(value):
self.context = zmq.Context()
self.cmd_socket = self.context.socket(zmq.REQ)
self.cmd_socket.connect(f"tcp://{host}:{cmd_port}")
# Status subscriber
self.sub_socket = None
self._status_thread = None
self._status_callback = None
self._status_running = False
self._last_status = None
def send_command(self, command: Union[str, KTestCommand], *args) -> Any:
"""
Send a command to the server.
Args:
command: A string command or KTestCommand enum
*args: Arguments for the command
Returns:
The server response
Raises:
CommandException: If the command fails on the server side
Exception: For other errors
"""
# Format command string
if isinstance(command, KTestCommand):
cmd_str = f"{command.value.lower()} {' '.join(str(arg) for arg in args)}"
else:
cmd_str = f"{command} {' '.join(str(arg) for arg in args)}"
# Send command
self.cmd_socket.send_string(cmd_str.strip())
# Get response
response = self.cmd_socket.recv_pyobj()
# Handle exceptions from the server
if isinstance(response, Exception):
if isinstance(response, (ValueError, RuntimeError)):
raise CommandException(str(response))
else:
raise response
return response
def subscribe_to_status(self, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
Subscribe to status updates.
Args:
callback: Function to call with status updates
"""
# Stop existing subscription if there is one
self.unsubscribe_from_status()
# Create a ZMQ SUB socket
sub_socket = self.context.socket(zmq.SUB)
sub_socket.setsockopt_string(zmq.SUBSCRIBE, '')
sub_socket.connect(f"tcp://{self.host}:{self.pub_port}")
# Set up status thread
self.sub_socket = sub_socket
self._status_callback = callback
self._status_running = True
self._status_thread = threading.Thread(target=self._status_listener, daemon=True)
self._status_thread.start()
def unsubscribe_from_status(self) -> None:
"""Stop receiving status updates."""
if not self._status_running:
return
# Stop the status thread
self._status_running = False
if self._status_thread:
self._status_thread.join(timeout=1.0)
# Close the socket
if self.sub_socket:
self.sub_socket.close()
self.sub_socket = None
def _status_listener(self) -> None:
"""Background thread to listen for status updates."""
poller = zmq.Poller()
poller.register(self.sub_socket, zmq.POLLIN)
while self._status_running:
try: try:
socks = dict(poller.poll(timeout=1000)) arr = value.value
if self.sub_socket in socks: # NTNDArray stores shape info in the dimension field
status = self.sub_socket.recv_pyobj() if hasattr(value, 'dimension') and value.dimension:
dims = value.dimension
# Store status shape = tuple(d.size for d in dims)
self._last_status = status arr = arr.reshape(shape)
callback(arr, 0)
# Call callback
if self._status_callback:
self._status_callback(status)
except Exception as e: except Exception as e:
print(f"Error in status listener: {e}") logger.debug("Image monitor error: %s", e)
def get_status(self) -> Dict[str, Any]: self._subscriptions["IMAGE"] = self._ctx.monitor(pv_name, _on_image)
def monitor_status(self, pv_name: str,
callback: Callable[[Any], None]) -> None:
""" """
Get the current status from the server. Subscribe to a status PV for live updates.
Returns:
The current system status
"""
if self._last_status is not None:
return self._last_status
return self.send_command("get_status")
def insert_detector(self) -> str:
"""
Insert the detector into the beam.
Returns:
Success message
"""
return self.send_command("insert")
def retract_detector(self) -> str:
"""
Retract the detector from the beam.
Returns:
Success message
"""
return self.send_command("retract")
def power_supply_on(self) -> str:
"""
Turn on the detector power supply.
Returns:
Success message
"""
return self.send_command("power_on")
def power_supply_off(self) -> str:
"""
Turn off the detector power supply.
Returns:
Success message
"""
return self.send_command("power_off")
def read_voltages(self) -> Dict[str, List[float]]:
"""
Read the power supply voltages.
Returns:
Dictionary with voltage values
"""
return self.send_command("read_voltages")
# KTest command convenience methods
def reset_connection(self) -> Any:
"""
Reset the connection between the k_test server and the detector.
Returns:
Server response
"""
return self.send_command(KTestCommand.RESET_CONNECTION)
def start_daq(self) -> Any:
"""
Start the data acquisition process.
Returns:
Server response
"""
return self.send_command(KTestCommand.RESTART_DAQ)
def stop_daq(self) -> Any:
"""
Stop the data acquisition process.
Returns:
Server response
"""
return self.send_command(KTestCommand.STOP_DAQ)
def set_exposure_mode(self, exposure_mode: ExposureModes) -> Any:
"""
Set the exposure mode for the detector.
Args: Args:
exposure_mode: The exposure mode to set pv_name: PV suffix (e.g., "FRAME_RATE", "ACQUIRING").
callback: Function called with the new value on each update.
Returns:
Server response
""" """
return self.send_command(KTestCommand.SET_EXPOSURE_MODE, exposure_mode) self.stop_monitor(pv_name)
def set_trigger_mode(self, trigger_mode: TriggerModes) -> Any: full_name = self._pv(pv_name)
"""
Set the trigger mode for the detector. def _on_update(value):
try:
Args: val = value.value if hasattr(value, 'value') else value
trigger_mode: The trigger mode to set callback(val)
except Exception as e:
Returns: logger.debug("Monitor %s error: %s", pv_name, e)
Server response
""" self._subscriptions[pv_name] = self._ctx.monitor(full_name, _on_update)
return self.send_command(KTestCommand.SET_TRIGGER_MODE, trigger_mode)
def stop_monitor(self, name: str) -> None:
def set_integration_time(self, integration_time_ms: float) -> Any: """Stop a specific monitor subscription."""
""" sub = self._subscriptions.pop(name, None)
Set the integration time in milliseconds. if sub is not None:
sub.close()
Args:
integration_time_ms: The integration time in milliseconds def stop_all_monitors(self) -> None:
"""Stop all active monitor subscriptions."""
Returns: for sub in self._subscriptions.values():
Server response sub.close()
""" self._subscriptions.clear()
return self.send_command(KTestCommand.SET_INTEGRATION_TIME, integration_time_ms)
def load_parameter_file(self, file_path: str) -> Any:
"""
Load a parameter file for the detector.
Args:
file_path: Path to the parameter file
Returns:
Server response
"""
return self.send_command(KTestCommand.LOAD_PARAMETER_FILE, file_path)
def close(self) -> None: def close(self) -> None:
"""Close the client connections.""" """Close the client and release resources."""
# Unsubscribe if needed self.stop_all_monitors()
self.unsubscribe_from_status() self._ctx.close()
# Close command socket
if self.cmd_socket:
self.cmd_socket.close()
# Terminate context
if self.context:
self.context.term()
class AsyncClient:
"""
Asynchronous client for the Team1k server.
Provides async methods to send commands and receive status updates.
"""
def __init__(self, host: str = 'localhost', cmd_port: int = 42003, pub_port: int = 42004):
"""
Initialize the asynchronous Team1k client.
Args:
host: The hostname of the server
cmd_port: The command socket port
pub_port: The status publication port
"""
self.host = host
self.cmd_port = cmd_port
self.pub_port = pub_port
# Set up ZMQ for commands
self.context = zmq.asyncio.Context()
self.cmd_socket = self.context.socket(zmq.REQ)
self.cmd_socket.connect(f"tcp://{host}:{cmd_port}")
# Status subscriber
self.sub_socket = None
self._status_task = None
self._status_callbacks = []
self._status_running = False
self._last_status = None
async def send_command(self, command: Union[str, KTestCommand], *args) -> Any:
"""
Send a command to the server asynchronously.
Args:
command: A string command or KTestCommand enum
*args: Arguments for the command
Returns:
The server response
Raises:
CommandException: If the command fails on the server side
Exception: For other errors
"""
# Format command string
if isinstance(command, KTestCommand):
cmd_str = f"{command.value.lower()} {' '.join(str(arg) for arg in args)}"
else:
cmd_str = f"{command} {' '.join(str(arg) for arg in args)}"
# Send command
await self.cmd_socket.send_string(cmd_str.strip())
# Get response
response = await self.cmd_socket.recv_pyobj()
# Handle exceptions from the server
if isinstance(response, Exception):
if isinstance(response, (ValueError, RuntimeError)):
raise CommandException(str(response))
else:
raise response
return response
async def subscribe_to_status(self, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
Subscribe to status updates asynchronously.
Args:
callback: Function to call with status updates
"""
# Add callback if we're already subscribed
if self._status_running and self.sub_socket:
self._status_callbacks.append(callback)
return
# Create a ZMQ SUB socket
sub_socket = self.context.socket(zmq.SUB)
sub_socket.setsockopt_string(zmq.SUBSCRIBE, '')
sub_socket.connect(f"tcp://{self.host}:{self.pub_port}")
# Set up status task
self.sub_socket = sub_socket
self._status_callbacks = [callback]
self._status_running = True
self._status_task = asyncio.create_task(self._status_listener())
async def unsubscribe_from_status(self) -> None:
"""Stop receiving status updates asynchronously."""
if not self._status_running:
return
# Stop the status task
self._status_running = False
if self._status_task:
self._status_task.cancel()
try:
await self._status_task
except asyncio.CancelledError:
pass
# Close the socket
if self.sub_socket:
self.sub_socket.close()
self.sub_socket = None
async def _status_listener(self) -> None:
"""Background task to listen for status updates."""
while self._status_running:
try:
# Receive message
status = await self.sub_socket.recv_pyobj()
# Store status
self._last_status = status
# Call callbacks
for callback in self._status_callbacks:
callback(status)
except asyncio.CancelledError:
# Task was cancelled
break
except Exception as e:
print(f"Error in status listener: {e}")
async def get_status(self) -> Dict[str, Any]:
"""
Get the current status from the server asynchronously.
Returns:
The current system status
"""
if self._last_status is not None:
return self._last_status
return await self.send_command("get_status")
async def insert_detector(self) -> str:
"""
Insert the detector into the beam asynchronously.
Returns:
Success message
"""
return await self.send_command("insert")
async def retract_detector(self) -> str:
"""
Retract the detector from the beam asynchronously.
Returns:
Success message
"""
return await self.send_command("retract")
async def power_supply_on(self) -> str:
"""
Turn on the detector power supply asynchronously.
Returns:
Success message
"""
return await self.send_command("power_on")
async def power_supply_off(self) -> str:
"""
Turn off the detector power supply asynchronously.
Returns:
Success message
"""
return await self.send_command("power_off")
async def read_voltages(self) -> Dict[str, List[float]]:
"""
Read the power supply voltages asynchronously.
Returns:
Dictionary with voltage values
"""
return await self.send_command("read_voltages")
# KTest command convenience methods
async def reset_connection(self) -> Any:
"""
Reset the connection between the k_test server and the detector asynchronously.
Returns:
Server response
"""
return await self.send_command(KTestCommand.RESET_CONNECTION)
async def start_daq(self) -> Any:
"""
Start the data acquisition process asynchronously.
Returns:
Server response
"""
return await self.send_command(KTestCommand.RESTART_DAQ)
async def stop_daq(self) -> Any:
"""
Stop the data acquisition process asynchronously.
Returns:
Server response
"""
return await self.send_command(KTestCommand.STOP_DAQ)
async def set_exposure_mode(self, exposure_mode: ExposureModes) -> Any:
"""
Set the exposure mode for the detector asynchronously.
Args:
exposure_mode: The exposure mode to set
Returns:
Server response
"""
return await self.send_command(KTestCommand.SET_EXPOSURE_MODE, exposure_mode)
async def set_trigger_mode(self, trigger_mode: TriggerModes) -> Any:
"""
Set the trigger mode for the detector asynchronously.
Args:
trigger_mode: The trigger mode to set
Returns:
Server response
"""
return await self.send_command(KTestCommand.SET_TRIGGER_MODE, trigger_mode)
async def set_integration_time(self, integration_time_ms: float) -> Any:
"""
Set the integration time in milliseconds asynchronously.
Args:
integration_time_ms: The integration time in milliseconds
Returns:
Server response
"""
return await self.send_command(KTestCommand.SET_INTEGRATION_TIME, integration_time_ms)
async def load_parameter_file(self, file_path: str) -> Any:
"""
Load a parameter file for the detector asynchronously.
Args:
file_path: Path to the parameter file
Returns:
Server response
"""
return await self.send_command(KTestCommand.LOAD_PARAMETER_FILE, file_path)
async def close(self) -> None:
"""Close the client connections asynchronously."""
# Unsubscribe if needed
await self.unsubscribe_from_status()
# Close command socket
if self.cmd_socket:
self.cmd_socket.close()
# Terminate context
if self.context:
self.context.term()
# Example usage # Example usage
if __name__ == "__main__": if __name__ == "__main__":
# Synchronous example logging.basicConfig(level=logging.INFO)
client = Client() client = Client()
print("Synchronous client:")
try: try:
print("Status:", client.get_status()) print("Status:", client.get_status())
print("Voltages:", client.read_voltages()) print("Acquiring:", client.is_acquiring())
except CommandException as e: print("Frame rate:", client.get_frame_rate())
print(f"Command failed: {e}") except CommandError as e:
client.close() print(f"Error: {e}")
finally:
# Asynchronous example client.close()
async def main():
client = AsyncClient()
print("\nAsynchronous client:")
try:
status = await client.get_status()
print("Status:", status)
voltages = await client.read_voltages()
print("Voltages:", voltages)
except CommandException as e:
print(f"Command failed: {e}")
await client.close()
asyncio.run(main())

View File

@@ -1,191 +0,0 @@
import socket
import time
import enum
class KTestError(Exception):
"""Custom exception for KTestClient errors."""
pass
class ExposureModes(enum.IntEnum):
"""Exposure modes for the detector."""
ROLLING_SHUTTER = 0
"""Rolling shutter mode."""
ROLLING_SHUTTER_WITH_PAUSE = 1
"""Rolling shutter with pause mode."""
GLOBAL_SHUTTER = 2
"""Global shutter mode."""
GLOBAL_SHUTTER_CDS = 3
"""Global shutter with Correlated Double Sampling (CDS) using two images."""
class TriggerModes(enum.IntEnum):
"""Trigger modes for the detector (more implemented but not supported by k_test server)."""
INTERNAL_TRIGGER = 0
"""Internal trigger mode."""
EXTERNAL_TRIGGER = 1
"""External trigger mode."""
class KTestCommand(enum.Enum):
RESET_CONNECTION = "resetdetectorconnection"
"""Reset the connection from k_test to the detector."""
# PRINT_EVERY_NTH = "printeverynth"
# """Set print frequency (requires value)"""
TEST_MODE = "testmode"
"""Enable/disable test mode (0 or 1)"""
LOAD_PARAMETER_FILE = "parameterfile"
"""Load parameter file (requires filename)"""
READ_REGISTER = "readregister"
"""Read register (requires address)"""
WRITE_REGISTER = "writeregister"
"""Write register (requires address and value)"""
SET_ADC_CLOCK_FREQ = "setadcclockfreq"
"""Set ADC clock frequency in MHz (50-100)"""
SET_OUTPUT_DIR = "setoutputdir"
"""Set output directory (requires path)"""
ENABLE_FILE_WRITING = "enablefilewriting"
"""Enable file writing"""
DISABLE_FILE_WRITING = "disablefilewriting"
"""Disable file writing"""
SET_EXPOSURE_MODE = "setexposuremode"
"""Set exposure mode (0-3)"""
SET_TRIGGER_MODE = "settriggermode"
"""Set trigger mode (0-2)"""
SET_INTEGRATION_TIME = "setintegrationtime"
"""Set integration time in ms"""
SET_ADC_DATA_DELAY = "setadcdatadelay"
"""Set ADC data delay"""
RESTART_DAQ = "restartdaq"
"""Restart DAQ"""
STOP_DAQ = "stopdaq"
"""Stop DAQ"""
KTEST_COMMAND_ARGS = {
KTestCommand.RESET_CONNECTION: (),
# KTestCommand.PRINT_EVERY_NTH: (int,),
KTestCommand.TEST_MODE: (int,),
KTestCommand.LOAD_PARAMETER_FILE: (str,),
KTestCommand.READ_REGISTER: (int,),
KTestCommand.WRITE_REGISTER: (int, int),
KTestCommand.SET_ADC_CLOCK_FREQ: (float,),
KTestCommand.SET_OUTPUT_DIR: (str,),
KTestCommand.ENABLE_FILE_WRITING: (),
KTestCommand.DISABLE_FILE_WRITING: (),
KTestCommand.SET_EXPOSURE_MODE: (ExposureModes,),
KTestCommand.SET_TRIGGER_MODE: (TriggerModes,),
KTestCommand.SET_INTEGRATION_TIME: (float,),
KTestCommand.SET_ADC_DATA_DELAY: (int,),
KTestCommand.RESTART_DAQ: (),
KTestCommand.STOP_DAQ: (),
}
class KTestClient:
def __init__(self, host='localhost', port=42003):
self.host = host
self.port = port
def send_command(self, command: KTestCommand, *args) -> str:
"""
Send a command to the KTestServer and return the response.
Args:
command (KTestCommand): Command enum to send
*args: Arguments for the command
Returns:
str: response_message
"""
if command not in KTEST_COMMAND_ARGS:
raise ValueError(f"Invalid command: {command}")
expected_args = KTEST_COMMAND_ARGS[command]
if len(args) != len(expected_args):
raise ValueError(f"Invalid number of arguments for {command.value}. Expected {len(expected_args)}, got {len(args)}.")
# Validate argument types
for i, (arg, expected_type) in enumerate(zip(args, expected_args)):
if not isinstance(arg, expected_type):
raise TypeError(f"Invalid argument type for argument {i+1} of {command.value}. Expected {expected_type.__name__}, got {type(arg).__name__}.")
# Construct command string
command_str = command.value
if args:
command_str += ' ' + ' '.join(str(arg) for arg in args)
ret_code, ret_msg = self.send_command_str(command_str)
if ret_code != 0:
raise KTestError(f"Command '{command_str}' failed with code {ret_code}: {ret_msg}")
return ret_msg
def send_command_str(self, command) -> tuple[int, str]:
"""
Send a command to the KTestServer and return the response.
Args:
command (str): Command string to send
Returns:
tuple: (return_code, response_message)
"""
sock = None
try:
# Create TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.host, self.port))
# Send command
sock.send(command.encode('utf-8'))
# Receive response
response = b""
while True:
chunk = sock.recv(4096)
if not chunk:
break
response += chunk
sock.close()
# Parse response - return code is at the beginning
response_str = response.decode('utf-8')
lines = response_str.strip().split('\n')
# Extract return code (first number in response)
return_code = None
for line in lines:
if line.strip().isdigit() or (line.strip().startswith('-') and line.strip()[1:].isdigit()):
return_code = int(line.strip())
break
if return_code is None:
raise KTestError(f"Invalid response from server: {response_str}")
return return_code, response_str
except ConnectionRefusedError:
raise ConnectionRefusedError("Connection error: Server is not running or unreachable. Start the k_test server first.")
except Exception as e:
raise e
finally:
try:
if sock is not None:
sock.close()
except:
pass

View File

@@ -1,8 +1,8 @@
from .Client import Client, AsyncClient from .Client import Client, ExposureModes, TriggerModes, CommandError
from .KTestClient import KTestCommand, KTestError, ExposureModes, TriggerModes
__all__ = [ __all__ = [
'Client', 'AsyncClient', 'Client',
'KTestCommand', 'KTestError', 'ExposureModes',
'ExposureModes', 'TriggerModes' 'TriggerModes',
] 'CommandError',
]

View File

@@ -0,0 +1 @@
from .receiver import AcquisitionProcess

View File

@@ -0,0 +1,366 @@
"""
Dedicated subprocess for receiving detector UDP data at high throughput.
Ports MainDAQThread + GetImages from KTestFunctions.cxx (lines 868-1047).
Runs as a multiprocessing.Process, communicates via locking_shmring and a pipe.
"""
import struct
import time
import logging
import multiprocessing
from multiprocessing.connection import Connection
import numpy as np
from ..detector.chip_config import ChipConfig, TEAM1K_CONFIG
logger = logging.getLogger(__name__)
# Commands sent over the pipe from the main process
CMD_START = "start"
CMD_STOP = "stop"
CMD_SHUTDOWN = "shutdown"
class AcquisitionProcess:
"""
Manages a dedicated subprocess for UDP data acquisition.
The subprocess receives raw UDP packets from the detector, assembles them
into complete frames, and writes them to a shared memory ring buffer
(locking_shmring). The main process reads from the ring buffer for PVA
streaming and file writing.
"""
def __init__(self, detector_ip: str, data_port: int = 41000,
ring_name: str = "team1k_frames", num_ring_slots: int = 32,
chip_config: ChipConfig = TEAM1K_CONFIG):
self._detector_ip = detector_ip
self._data_port = data_port
self._ring_name = ring_name
self._num_ring_slots = num_ring_slots
self._config = chip_config
self._process: multiprocessing.Process | None = None
self._parent_conn: Connection | None = None
@property
def ring_name(self) -> str:
return self._ring_name
@property
def is_alive(self) -> bool:
return self._process is not None and self._process.is_alive()
def start_process(self) -> None:
"""Start the acquisition subprocess."""
if self.is_alive:
logger.warning("Acquisition process already running")
return
parent_conn, child_conn = multiprocessing.Pipe()
self._parent_conn = parent_conn
self._process = multiprocessing.Process(
target=_acquisition_main,
args=(
self._detector_ip,
self._data_port,
self._ring_name,
self._num_ring_slots,
self._config,
child_conn,
),
daemon=True,
name="team1k-acquisition",
)
self._process.start()
child_conn.close() # Parent doesn't need the child end
logger.info("Acquisition subprocess started (PID %d)", self._process.pid)
def start_acquisition(self) -> None:
"""Tell the subprocess to start acquiring data."""
if self._parent_conn:
self._parent_conn.send(CMD_START)
def stop_acquisition(self) -> None:
"""Tell the subprocess to stop acquiring data (but keep running)."""
if self._parent_conn:
self._parent_conn.send(CMD_STOP)
def shutdown(self, timeout: float = 5.0) -> None:
"""Shut down the subprocess completely."""
if self._parent_conn:
self._parent_conn.send(CMD_SHUTDOWN)
self._parent_conn.close()
self._parent_conn = None
if self._process and self._process.is_alive():
self._process.join(timeout=timeout)
if self._process.is_alive():
logger.warning("Acquisition process did not exit, terminating")
self._process.terminate()
self._process.join(timeout=2.0)
self._process = None
logger.info("Acquisition subprocess shut down")
def _acquisition_main(detector_ip: str, data_port: int,
ring_name: str, num_slots: int,
config: ChipConfig, cmd_pipe: Connection) -> None:
"""
Main function running in the acquisition subprocess.
This must not import asyncio or anything that would interfere with the
tight recv loop.
"""
# Import here to avoid importing in the main process unnecessarily
import socket as _socket
import select as _select
from locking_shmring import RingBufferWriter
logging.basicConfig(level=logging.INFO)
proc_logger = logging.getLogger("team1k.acquisition")
# Create shared memory ring buffer
writer = RingBufferWriter.create_numpy(
ring_name, num_slots, (config.image_nx, config.image_ny), np.uint16
)
proc_logger.info("Ring buffer '%s' created: %d slots of %dx%d uint16",
ring_name, num_slots, config.image_nx, config.image_ny)
# Create UDP data socket
sock = _socket.socket(_socket.AF_INET, _socket.SOCK_DGRAM)
sock.bind(('', 0))
# Try to set large socket buffer
for buf_size in [400 * 1024 * 1024, 5 * 1024 * 1024]:
try:
sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_RCVBUF, buf_size)
actual = sock.getsockopt(_socket.SOL_SOCKET, _socket.SO_RCVBUF)
if actual >= buf_size // 2:
proc_logger.info("Socket buffer set to %d bytes (actual: %d)", buf_size, actual)
break
except OSError:
continue
local_port = sock.getsockname()[1]
dest_addr = (detector_ip, data_port)
# Perform loopback to register with detector
from ..detector.udp_transport import SECURITY_KEY
_perform_loopback(sock, dest_addr, SECURITY_KEY, proc_logger)
# Pre-allocate packet buffer
packet_buf = bytearray(config.bytes_per_packet)
header_view = memoryview(packet_buf)[:config.packet_header_size]
acquiring = False
running = True
frame_count = 0
proc_logger.info("Acquisition subprocess ready (local port %d)", local_port)
while running:
# Check for commands (non-blocking)
if cmd_pipe.poll(0.1 if not acquiring else 0):
try:
cmd = cmd_pipe.recv()
except EOFError:
break
if cmd == CMD_START:
acquiring = True
proc_logger.info("Acquisition started")
# Clear any stale data in socket
_clear_socket(sock)
elif cmd == CMD_STOP:
acquiring = False
proc_logger.info("Acquisition stopped (frame_count=%d)", frame_count)
elif cmd == CMD_SHUTDOWN:
running = False
continue
if not acquiring:
continue
# Acquire one complete frame
slot = writer.start_write(timeout_ms=100)
if slot is None:
proc_logger.warning("Ring buffer full, dropping frame")
# Drain one frame's worth of packets to stay in sync
_skip_frame(sock, config)
continue
frame_arr = slot.data_as_numpy((config.image_ny, config.image_nx), np.uint16)
frame_arr_flat = frame_arr.ravel()
frame_id, ok = _assemble_frame(
sock, packet_buf, frame_arr_flat, config, proc_logger
)
if ok:
slot.commit(frame_id=frame_count, timestamp_ns=time.time_ns())
frame_count += 1
else:
slot.abort()
# Clean shutdown
writer.signal_shutdown()
writer.close()
sock.close()
proc_logger.info("Acquisition subprocess exiting (total frames: %d)", frame_count)
def _perform_loopback(sock, dest_addr, security_key, log) -> bool:
"""Perform loopback test to register with detector data port."""
import struct as _struct
nwords = 20
parts = [_struct.pack('<II', security_key, 0)] # key + dst_table_offset=0
for i in range(nwords * 2):
parts.append(_struct.pack('<I', (i - 2) & 0x7FFFFFFF))
packet = b''.join(parts)
expected_size = (2 * nwords + 2) * 4
sock.sendto(packet[:expected_size], dest_addr)
import select as _select
ready, _, _ = _select.select([sock], [], [], 0.12)
if not ready:
log.warning("Loopback timeout — detector may not be reachable at %s", dest_addr)
return False
response = sock.recv(expected_size)
if len(response) != expected_size:
log.warning("Loopback size mismatch: expected %d, got %d", expected_size, len(response))
return False
# Verify first (nwords+2) words match
for i in range(nwords + 2):
sent = _struct.unpack_from('<I', packet, i * 4)[0]
recv = _struct.unpack_from('<I', response, i * 4)[0]
if sent != recv:
log.error("Loopback mismatch at word %d: 0x%08x != 0x%08x", i, sent, recv)
return False
log.info("Loopback test passed")
return True
def _clear_socket(sock) -> None:
"""Drain all pending data from socket."""
sock.setblocking(False)
try:
while True:
try:
sock.recv(65536)
except BlockingIOError:
break
finally:
sock.setblocking(True)
def _skip_frame(sock, config: ChipConfig) -> None:
"""Drain packets for one frame to stay in sync."""
sock.setblocking(False)
try:
for _ in range(config.npackets_per_image):
try:
sock.recv(config.bytes_per_packet)
except BlockingIOError:
break
finally:
sock.setblocking(True)
def _assemble_frame(sock, packet_buf: bytearray, frame_flat: np.ndarray,
config: ChipConfig, log) -> tuple[int, bool]:
"""
Assemble one complete frame from UDP packets.
Ported from KTestFunctions::GetImages (lines 974-1047).
Each packet: 8-byte header + pixel data
Header format: [frame_number(u32, stored as 2xu16), reserved(u16), packet_number(u16)]
Args:
sock: UDP socket to receive from.
packet_buf: Pre-allocated buffer for one packet.
frame_flat: Flat numpy view of the frame to write into.
config: Chip configuration.
log: Logger.
Returns:
Tuple of (frame_number, success).
"""
import select as _select
npackets = config.npackets_per_image
npixels_per_pkt = config.npixels_per_packet
expected_size = config.bytes_per_packet
header_size = config.packet_header_size
expected_packet_num = 0
last_frame_number = 0
realigning = False
frame_number = 0
i = 0
while i < npackets:
# Receive a packet
while True:
ready, _, _ = _select.select([sock], [], [], 0.12)
if not ready:
continue
nbytes = sock.recv_into(packet_buf)
if nbytes == 0:
continue
elif nbytes != expected_size:
log.error("Wrong packet size: expected %d, got %d", expected_size, nbytes)
return 0, False
break
# Parse header
frame_number = struct.unpack_from('<I', packet_buf, 0)[0]
packet_number = struct.unpack_from('<H', packet_buf, 6)[0]
# New frame resets expected packet number
if frame_number != last_frame_number:
expected_packet_num = 0
last_frame_number = frame_number
# Handle realignment: discard packets until we find the start of an even frame
if realigning:
if frame_number % 2 == 0 and packet_number == 0:
expected_packet_num = packet_number
realigning = False
i = 0 # Restart frame assembly
else:
continue # Keep reading packets
# Check packet ordering
if expected_packet_num != packet_number:
log.warning("Packet number mismatch at %d: expected %d, got %d (frame %d)",
i, expected_packet_num, packet_number, frame_number)
log.info("Realigning to next even frame...")
realigning = True
i = 0
continue
expected_packet_num += 1
# Copy pixel data into frame buffer
pixel_offset = i * npixels_per_pkt
pixel_data = packet_buf[header_size:header_size + npixels_per_pkt * 2]
frame_flat[pixel_offset:pixel_offset + npixels_per_pkt] = np.frombuffer(
pixel_data, dtype=np.uint16
)
i += 1
return frame_number, True

View File

@@ -0,0 +1,7 @@
from .udp_transport import UDPSocket, SECURITY_KEY
from .registers import RegisterInterface
from .data_port import DataPort
from .chip_config import ChipConfig, TEAM1K_CONFIG, configure_chip
from .commands import DetectorCommands
from .adc import ADCController
from .parameter_file import apply_parameter_file

231
src/team1k/detector/adc.py Normal file
View File

@@ -0,0 +1,231 @@
"""
ADC Si570 clock programming and I2C interface.
Ports KTestFunctions::SetADCClockFreq (lines 186-336) and
KTestFunctions::WriteADCI2C (lines 131-175) from KTestFunctions.cxx.
"""
import time
import math
import logging
from .registers import RegisterInterface
logger = logging.getLogger(__name__)
# I2C command constants (from C++ #defines)
CMD_START = 0x53
CMD_STOP = 0x51
CMD_RD_NP = 0x72 # read with no-acknowledge (last byte)
CMD_RD_P = 0x52 # read with acknowledge
CMD_NOP = 0x4E
CMD_I2CRST = 0x49
# Si570 constants
_I2C_ADDRESS = 0x55
_FDCO_TARGET = 5260.0 # MHz
_HS_VALUES = [4, 5, 6, 7, None, 9, None, 11] # index 4,6 are invalid
_FXTAL = 114.286 # MHz (hardcoded, bypassing recall)
class ADCController:
"""Si570 clock programming and ADC interface."""
def __init__(self, registers: RegisterInterface):
self._reg = registers
self.current_freq_mhz: float = 100.0
def write_adc_i2c(self, tx_data: list[int],
print_info: bool = False) -> tuple[bool, list[int]]:
"""
Write I2C data via FPGA register interface.
Ported from KTestFunctions::WriteADCI2C (lines 131-175).
The FPGA has an I2C controller accessible through registers 90 (write)
and 91 (read). Data is written to a 128-entry RAM, then execution is
triggered. Results are read back from the same RAM.
Args:
tx_data: List of I2C command bytes to send (max 123 entries).
print_info: Whether to log detailed I2C transaction info.
Returns:
Tuple of (success, rx_data_list).
"""
n = len(tx_data)
if n > 123:
logger.error("I2C write list too long: %d (max 123)", n)
return False, []
# Write the command data to FPGA I2C RAM (128 entries)
for i in range(128):
value = tx_data[i] if i < n else (0x100 | CMD_NOP)
if i < n and print_info:
logger.debug("I2C TX %d) 0x%x", i, tx_data[i])
# Write sequence: clear, set write-enable, clear
self._reg.write_register(90, (i << 16) | value)
self._reg.write_register(90, 0x40000000 | (i << 16) | value)
self._reg.write_register(90, (i << 16) | value)
# Trigger I2C execution (write n entries)
self._reg.write_register(90, (n << 16))
self._reg.write_register(90, 0x80000000 | (n << 16))
self._reg.write_register(90, (n << 16))
# Wait for I2C transaction to complete
time.sleep(1.0)
# Read back results
ok = True
rx_data = []
for i in range(n):
self._reg.write_register(90, (i << 16))
self._reg.write_register(90, (i << 16))
value = self._reg.read_register(91)
rx_data.append(value & 0x1FF)
if print_info:
logger.debug("I2C RX %d) tx=0x%x rx=0x%x", i, tx_data[i], value)
# Check for error: bit 8 set means NACK (except for CMD_RD_NP)
if tx_data[i] != (0x100 | CMD_RD_NP) and (value & 0x100) != 0:
logger.error("I2C error at %d: tx=0x%x rx=0x%x", i, tx_data[i], value)
ok = False
return ok, rx_data
def set_clock_freq(self, freq_mhz: float, print_info: bool = True) -> bool:
"""
Set ADC clock frequency via Si570 I2C programming.
Ported from KTestFunctions::SetADCClockFreq (lines 186-336).
The Si570 is a programmable oscillator controlled via I2C.
Algorithm:
1. Find best HS divider and N1 values for target frequency
2. Compute new RFREQ value from fdco/fxtal
3. Build I2C sequence: freeze DCO -> write registers -> unfreeze -> apply
Args:
freq_mhz: Target frequency in MHz (50-100).
print_info: Whether to log computation details.
Returns:
True on success.
"""
if freq_mhz < 50 or freq_mhz > 100:
logger.error("Frequency out of range: %.1f MHz (must be 50-100)", freq_mhz)
return False
# Find best HS divider and N1 combination
best_diff = None
best_hs_index = 0
best_hs_div = 4
best_n1 = 2
for i in range(8):
if _HS_VALUES[i] is None: # skip invalid indices 4 and 6
continue
hs = _HS_VALUES[i]
# C++ formula: (int(fdco/freq/hs+1)/2*2) & 0x7f
n_temp = (int(_FDCO_TARGET / freq_mhz / hs + 1) // 2 * 2) & 0x7F
diff = abs(_FDCO_TARGET - freq_mhz * hs * n_temp)
if best_diff is None or diff <= best_diff:
best_hs_index = i
best_hs_div = hs
best_n1 = n_temp
best_diff = diff
fdco = freq_mhz * best_hs_div * best_n1
fxtal = _FXTAL
# Compute new RFREQ register value
new_value = int(fdco / fxtal * (2 ** 28))
if print_info:
logger.info("Setting ADC clock to %.1f MHz", freq_mhz)
logger.info(" fdco=%.1f MHz, hs_div=%d, n1=%d",
fdco, best_hs_div, best_n1)
logger.info(" RFREQ register: 0x%x", new_value)
# Build I2C command sequence
i2c_addr_write = _I2C_ADDRESS << 1
# NOP padding at the start
seq = [
0x100 | CMD_NOP, # NOP
0x100 | CMD_NOP, # NOP
0x100 | CMD_NOP, # NOP
0x100 | CMD_NOP, # NOP
]
# Phase 1: Freeze DCO (register 137 = 0x10)
seq += [
0x100 | CMD_NOP, # NOP
0x100 | CMD_START, # START
i2c_addr_write, # Si570 address, write
137, # DCO register address
0x10, # Freeze DCO
0x100 | CMD_STOP, # STOP
0x100 | CMD_NOP, # NOP
0x100 | CMD_NOP, # NOP
]
# Phase 2: Write new HS_DIV, N1, and RFREQ (registers 7-12)
reg7 = ((best_hs_index & 0x7) << 5) | (((best_n1 - 1) >> 2) & 0x1F)
reg8 = (((best_n1 - 1) & 0x3) << 6) | ((new_value >> 32) & 0x3F)
reg9 = (new_value >> 24) & 0xFF
reg10 = (new_value >> 16) & 0xFF
reg11 = (new_value >> 8) & 0xFF
reg12 = new_value & 0xFF
seq += [
0x100 | CMD_NOP, # NOP
0x100 | CMD_START, # START
i2c_addr_write, # Si570 address, write
7, # Start register address
reg7, # HS_DIV[2:0] | N1[6:2]
reg8, # N1[1:0] | RFREQ[37:32]
reg9, # RFREQ[31:24]
reg10, # RFREQ[23:16]
reg11, # RFREQ[15:8]
reg12, # RFREQ[7:0]
0x100 | CMD_STOP, # STOP
0x100 | CMD_NOP, # NOP
0x100 | CMD_NOP, # NOP
]
# Phase 3: Unfreeze DCO (register 137 = 0x00)
seq += [
0x100 | CMD_NOP, # NOP
0x100 | CMD_START, # START
i2c_addr_write, # Si570 address, write
137, # DCO register address
0x00, # Unfreeze DCO
0x100 | CMD_STOP, # STOP
0x100 | CMD_NOP, # NOP
0x100 | CMD_NOP, # NOP
]
# Phase 4: Apply new frequency (register 135 = 0x40)
seq += [
0x100 | CMD_NOP, # NOP
0x100 | CMD_START, # START
i2c_addr_write, # Si570 address, write
135, # NewFreq register
0x40, # Apply
0x100 | CMD_STOP, # STOP
0x100 | CMD_NOP, # NOP
0x100 | CMD_NOP, # NOP
0x100 | CMD_NOP, # NOP
]
# Execute the I2C sequence
ok, _ = self.write_adc_i2c(seq, print_info=print_info)
if ok:
self.current_freq_mhz = freq_mhz
logger.info("ADC clock frequency set to %.1f MHz", freq_mhz)
else:
logger.error("Failed to set ADC clock frequency")
return ok

View File

@@ -0,0 +1,88 @@
"""
Chip configuration constants and register setup.
Ports KTestFunctions::SetChipType("team1k") from KTestFunctions.cxx (lines 527-545).
"""
import dataclasses
import logging
from .registers import RegisterInterface
logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class ChipConfig:
"""Immutable chip configuration."""
image_nx: int = 1024
image_ny: int = 1024
nimages_per_buffer: int = 16
n64words_per_packet: int = 1024 # 4 rows per packet
npixels_per_packet: int = 4096 # 4 * n64words_per_packet
npackets_per_image: int = 256 # image_ny / 4
bytes_per_pixel: int = 2 # uint16
packet_header_size: int = 8 # 8-byte header per UDP packet
# Register values for chip initialization (from SetChipType)
reg_ftc_rst_mode: int = 0 # register 21
reg_preburner: int = 7 << 16 # register 25
reg_pixels_rows: int = (64 << 16) | 1024 # register 26: 64 cols/adc, 1024 rows
reg_packets_words: int = (255 << 16) | 1023 # register 40: npackets-1, n64words-1
@property
def bytes_per_packet(self) -> int:
"""Total bytes per UDP data packet (header + pixel data)."""
return self.packet_header_size + self.npixels_per_packet * self.bytes_per_pixel
@property
def image_size_bytes(self) -> int:
"""Bytes per image frame."""
return self.image_nx * self.image_ny * self.bytes_per_pixel
@property
def buffer_size_bytes(self) -> int:
"""Bytes per multi-image buffer."""
return self.image_size_bytes * self.nimages_per_buffer
@property
def npixels_per_buffer(self) -> int:
return self.image_nx * self.image_ny * self.nimages_per_buffer
@property
def ntransfers_per_file(self) -> int:
"""Number of buffer transfers per ~2 GB file."""
return (2 * 1024 * 1024 * 1024) // (self.bytes_per_pixel * self.npixels_per_buffer)
# Default team1k configuration
TEAM1K_CONFIG = ChipConfig()
def configure_chip(registers: RegisterInterface,
config: ChipConfig = TEAM1K_CONFIG) -> bool:
"""
Write chip type registers to the detector.
Args:
registers: RegisterInterface for communicating with the detector.
config: Chip configuration to apply.
Returns:
True on success.
"""
logger.info("Configuring chip: %dx%d, %d packets/image",
config.image_nx, config.image_ny, config.npackets_per_image)
ok = registers.write_register(21, config.reg_ftc_rst_mode)
ok &= registers.write_register(25, config.reg_preburner)
ok &= registers.write_register(26, config.reg_pixels_rows)
ok &= registers.write_register(40, config.reg_packets_words)
if ok:
logger.info("Chip configured successfully")
else:
logger.error("Chip configuration failed")
return ok

View File

@@ -0,0 +1,170 @@
"""
High-level detector commands.
Ports the command methods from KTestFunctions.cxx:
SetExposureMode, SetTriggerMode, SetIntegrationTime, SetADCDataDelay,
SetADCDataAveraging, EnableFPGATestData, StartNStopDataFlow, etc.
"""
import time
import math
import logging
from .registers import RegisterInterface
logger = logging.getLogger(__name__)
class DetectorCommands:
"""High-level detector control operations via register writes."""
def __init__(self, registers: RegisterInterface):
self._reg = registers
self._exposure_mode: int = 0
@property
def exposure_mode(self) -> int:
return self._exposure_mode
def set_exposure_mode(self, mode: int) -> bool:
"""
Set exposure mode.
Args:
mode: 0=rolling shutter, 1=rolling shutter with pause,
2=global shutter, 3=global shutter CDS (double image).
Writes register 22.
"""
self._exposure_mode = mode
ok = self._reg.write_register(22, mode & 0x3)
if ok:
logger.info("Set exposure mode to %d", mode)
else:
logger.error("Failed to set exposure mode to %d", mode)
return ok
def set_trigger_mode(self, external: bool, polarity: bool = False,
window_trigger: bool = False) -> bool:
"""
Set trigger mode.
Args:
external: True for external trigger, False for internal.
polarity: Trigger edge polarity (True=falling, False=rising).
window_trigger: True for external trigger window mode.
Writes register 23.
"""
value = (1 if external else 0)
value |= (2 if window_trigger else 0)
value |= (4 if polarity else 0)
ok = self._reg.write_register(23, value)
if ok:
mode_str = "internal"
if external and window_trigger:
mode_str = "external window"
elif external:
mode_str = f"external {'falling' if polarity else 'rising'}-edge"
logger.info("Set trigger mode to %s", mode_str)
return ok
def set_integration_time(self, time_ms: float) -> bool:
"""
Set integration time.
Args:
time_ms: Integration time in milliseconds.
Converted to 20us steps internally.
Writes register 24.
"""
value = int(math.floor((time_ms + 10e-6) / 20e-6)) if time_ms > 0 else 0
ok = self._reg.write_register(24, value)
if ok:
logger.info("Set integration time to %.3f ms (register value: %d)", time_ms, value)
return ok
def set_adc_data_delay(self, delay: int) -> bool:
"""
Set ADC data delay for all 4 ADCs.
Args:
delay: Delay value.
Writes registers 32-35.
"""
ok = True
for i in range(4):
ok &= self._reg.write_register(32 + i, delay)
if ok:
logger.info("Set ADC data delay to 0x%x", delay)
return ok
def set_adc_data_averaging(self, power_of_2: int) -> bool:
"""
Set ADC data averaging.
Args:
power_of_2: Averaging factor as power of 2 (0-7).
Writes register 36.
"""
ok = self._reg.write_register(36, power_of_2 & 0x7)
if ok:
logger.info("Set ADC data averaging to 2^%d", power_of_2)
return ok
def enable_fpga_test_data(self, enable: bool = True) -> bool:
"""
Enable or disable FPGA test data generation.
Writes register 37.
"""
ok = self._reg.write_register(37, 1 if enable else 0)
logger.info("FPGA test data %s", "enabled" if enable else "disabled")
return ok
def start_data_flow(self) -> bool:
"""
Start data acquisition.
Resets frame counter and timestamp (register 20 = 0xC0000000),
then enables DAQ (register 20 = 0x00000001).
"""
ok = self._reg.write_register(20, 0xC0000000) # reset
time.sleep(0.00001) # 10 us
ok |= self._reg.write_register(20, 0x00000001) # enable
if ok:
logger.info("Data flow started")
return ok
def stop_data_flow(self) -> bool:
"""
Stop data acquisition.
Resets frame counter and timestamp (register 20 = 0xC0000000).
"""
ok = self._reg.write_register(20, 0xC0000000)
if ok:
logger.info("Data flow stopped")
return ok
def read_firmware_version(self) -> int:
"""Read firmware version from register 0xFFFF."""
return self._reg.read_register(0xFFFF)
def write_adc_spi(self, value: int) -> tuple[bool, int]:
"""
Write ADC SPI value and read back.
Writes register 80, waits 100ms, reads register 81.
Returns:
Tuple of (success, return_value).
"""
ok = self._reg.write_register(80, value)
time.sleep(0.1)
return_value = self._reg.read_register(81)
logger.info("ADC SPI write: 0x%x, read: 0x%x", value, return_value)
return ok, return_value

View File

@@ -0,0 +1,146 @@
"""
Data port setup and packet reception.
Ports OpenDataPort, SendDataPortLoopback, and GetDataPortData
from NetworkInterface.cxx (lines 162-268).
"""
import struct
import logging
from .udp_transport import UDPSocket, SECURITY_KEY
logger = logging.getLogger(__name__)
# Default buffer sizes
_LARGE_BUFFER = 400 * 1024 * 1024 # 400 MB
_SMALL_BUFFER = 5 * 1024 * 1024 # 5 MB fallback
# Loopback test constants
_LOOPBACK_NWORDS = 20
_LOOPBACK_PACKET_SIZE = (2 * _LOOPBACK_NWORDS + 2) * 4 # 168 bytes
class DataPortError(Exception):
"""Error during data port operations."""
pass
class DataPort:
"""
Manages the data channel (UDP) to the detector.
The data port receives high-throughput image data as UDP packets.
After setup, a loopback test registers the client with the detector.
"""
DEFAULT_PORT = 41000
def __init__(self, detector_ip: str, port: int = DEFAULT_PORT,
dst_table_offset: int = 0,
buffer_size: int = _LARGE_BUFFER):
"""
Open a data port and perform the loopback registration.
Args:
detector_ip: Detector IP address.
port: Detector data port (default 41000).
dst_table_offset: Destination table offset for detector routing.
buffer_size: Kernel receive buffer size (400 MB default, falls back to 5 MB).
"""
# Try large buffer first, fall back to smaller
try:
self._socket = UDPSocket(detector_ip, port, recv_buffer_size=buffer_size)
actual = self._socket._sock.getsockopt(
__import__('socket').SOL_SOCKET, __import__('socket').SO_RCVBUF
)
# Linux doubles the requested value; check if we got at least half
if actual < buffer_size // 2:
raise OSError(f"Buffer too small: {actual}")
except OSError:
logger.warning("Could not set socket buffer to %d bytes, trying %d",
buffer_size, _SMALL_BUFFER)
self._socket = UDPSocket(detector_ip, port, recv_buffer_size=_SMALL_BUFFER)
# Pre-allocate recv buffer for the maximum packet size
self._recv_buf = bytearray(65536)
# Perform loopback to register with detector
if not self._perform_loopback(dst_table_offset):
self._socket.close()
raise DataPortError("Data port loopback test failed")
logger.info("Data port opened on local port %d", self._socket.local_port)
def _perform_loopback(self, dst_table_offset: int) -> bool:
"""
Send a loopback packet to register this client with the detector.
The detector echoes the packet back to confirm the connection.
Format: [security_key(u32), dst_table_offset(u32), data_words(20 x u32)...]
Total: (2*20+2) * 4 = 168 bytes
"""
# Build loopback packet
parts = [struct.pack('<II', SECURITY_KEY, dst_table_offset)]
for i in range(_LOOPBACK_NWORDS * 2):
parts.append(struct.pack('<I', (i - 2) & 0x7FFFFFFF))
packet = b''.join(parts)[:_LOOPBACK_PACKET_SIZE]
# Send and receive
self._socket.send(packet)
response = self._socket.recv(_LOOPBACK_PACKET_SIZE, timeout_sec=0.12)
if response is None:
logger.error("Loopback timeout")
self._socket.clear_buffer()
return False
if len(response) != _LOOPBACK_PACKET_SIZE:
logger.error("Loopback size mismatch: expected %d, got %d",
_LOOPBACK_PACKET_SIZE, len(response))
self._socket.clear_buffer()
return False
# Verify echo matches for header + first NWORDS+2 words
for i in range(_LOOPBACK_NWORDS + 2):
sent_word = struct.unpack_from('<I', packet, i * 4)[0]
recv_word = struct.unpack_from('<I', response, i * 4)[0]
if sent_word != recv_word:
logger.error("Loopback mismatch at word %d: sent 0x%08x, got 0x%08x",
i, sent_word, recv_word)
self._socket.clear_buffer()
return False
return True
def recv_packet(self, buffer: bytearray | memoryview,
timeout_sec: float = 0.12) -> int:
"""
Receive a single data packet into the provided buffer.
The buffer should be large enough for header + data (e.g., 8200 bytes
for team1k: 8 header + 8192 pixel data).
Args:
buffer: Pre-allocated buffer to receive into.
timeout_sec: Timeout in seconds (0.12 = 120ms default).
Returns:
Number of bytes received, 0 on timeout.
"""
return self._socket.recv_into(buffer, timeout_sec=timeout_sec)
def clear(self) -> None:
"""Drain all buffered data packets."""
self._socket.clear_buffer()
def close(self) -> None:
"""Close the data port."""
self._socket.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()

View File

@@ -0,0 +1,179 @@
"""
Parameter file parser.
Ports KTestFunctions::ReadParameterFile from KTestFunctions.cxx (lines 1078-1195).
"""
import logging
from pathlib import Path
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .registers import RegisterInterface
from .commands import DetectorCommands
from .adc import ADCController
logger = logging.getLogger(__name__)
def _parse_int(s: str) -> int:
"""Parse an integer from string, supporting both decimal and hex (0x prefix)."""
s = s.strip()
if 'x' in s or 'X' in s:
return int(s, 16)
return int(s)
def apply_parameter_file(
filepath: str,
commands: 'DetectorCommands',
adc: 'ADCController',
registers: 'RegisterInterface',
restart_daq_callback=None,
) -> tuple[bool, str]:
"""
Parse and apply all settings from a parameter file.
The file format is one setting per line:
keyword value1 [value2 ...]
Lines starting with '--' are comments.
Supported keywords:
ADC_clock_frequency_in_MHz <freq>
ADC_data_delay <delay>
chip_type <name> (ignored — only team1k is supported)
exposure_mode <mode>
trigger_mode <external> <polarity>
integration_time <ms>
digital_signal_polarity <value>
digital_signal_order_7downto0 <value>
digital_signal_order_15downto8 <value>
ADC_order_7downto0 <value>
ADC_order_15downto8 <value>
restart_daq
Args:
filepath: Path to the parameter file.
commands: DetectorCommands instance.
adc: ADCController instance.
registers: RegisterInterface instance.
restart_daq_callback: Optional callable to restart DAQ (for 'restart_daq' keyword).
Returns:
Tuple of (success, output_message).
"""
path = Path(filepath)
if not path.is_file():
msg = f"Parameter file not found: {filepath}"
logger.error(msg)
return False, msg
output = [f"Reading parameter file: {filepath}"]
logger.info("Reading parameter file: %s", filepath)
with open(path, 'r') as f:
for line_num, raw_line in enumerate(f, start=1):
line = raw_line.strip()
if not line:
continue
if line.startswith('--'):
continue
parts = line.split()
if not parts:
continue
keyword = parts[0]
args = parts[1:]
ok = False
msg = ""
try:
if keyword == "ADC_clock_frequency_in_MHz":
freq = float(args[0])
ok = adc.set_clock_freq(freq)
msg = f"Set ADC clock frequency to {freq} MHz"
elif keyword == "ADC_data_delay":
delay = _parse_int(args[0])
ok = commands.set_adc_data_delay(delay)
msg = f"Set ADC data delay to 0x{delay:x}"
elif keyword == "chip_type":
if args[0] == "team1k":
ok = True
msg = "Chip type: team1k (confirmed)"
else:
msg = f"Unsupported chip type: {args[0]} (only team1k supported)"
ok = False
elif keyword == "exposure_mode":
mode = _parse_int(args[0])
ok = commands.set_exposure_mode(mode)
msg = f"Set exposure mode to {mode}"
elif keyword == "trigger_mode":
external = bool(_parse_int(args[0]))
polarity = bool(_parse_int(args[1])) if len(args) > 1 else False
ok = commands.set_trigger_mode(external, polarity)
msg = f"Set trigger mode: external={external}, polarity={polarity}"
elif keyword == "integration_time":
time_ms = float(args[0])
ok = commands.set_integration_time(time_ms)
msg = f"Set integration time to {time_ms} ms"
elif keyword == "digital_signal_polarity":
val = _parse_int(args[0])
ok = registers.write_register(27, val)
msg = f"Set digital signal polarity to {args[0]}"
elif keyword == "digital_signal_order_7downto0":
val = _parse_int(args[0])
ok = registers.write_register(28, val)
msg = f"Set digital signals 7:0 order to {args[0]}"
elif keyword == "digital_signal_order_15downto8":
val = _parse_int(args[0])
ok = registers.write_register(29, val)
msg = f"Set digital signals 15:8 order to {args[0]}"
elif keyword == "ADC_order_7downto0":
val = _parse_int(args[0])
ok = registers.write_register(30, val)
msg = f"Set ADC 7:0 order to {args[0]}"
elif keyword == "ADC_order_15downto8":
val = _parse_int(args[0])
ok = registers.write_register(31, val)
msg = f"Set ADC 15:8 order to {args[0]}"
elif keyword == "restart_daq":
if restart_daq_callback:
ok = restart_daq_callback()
else:
ok = True
msg = "Restarted DAQ"
else:
msg = f"Unknown keyword: {keyword}"
ok = False
except (IndexError, ValueError) as e:
msg = f"Error parsing line {line_num}: {line} ({e})"
ok = False
if ok:
output.append(f" {msg}")
else:
error_msg = f" Line {line_num}: {msg or f'Error: {line}'}"
output.append(error_msg)
logger.error("Parameter file error at line %d: %s", line_num, msg or line)
result = "\n".join(output)
return False, result
output.append(f"Done reading parameter file: {filepath}")
result = "\n".join(output)
logger.info("Parameter file applied successfully")
return True, result

View File

@@ -0,0 +1,217 @@
"""
Register read/write protocol over UDP.
Ports the register operations from NetworkInterface.cxx (lines 83-158).
Communicates with the detector's register port (default 42000).
"""
import struct
import logging
import threading
from .udp_transport import UDPSocket, SECURITY_KEY
logger = logging.getLogger(__name__)
# Register protocol constants
_WRITE_BIT = 0x80000000 # Bit 31 set = write operation
_ADDR_ECHO_MASK = 0x70000000 # Detector ORs this into echoed addresses
class RegisterError(Exception):
"""Error during register read/write."""
pass
class RegisterInterface:
"""
Register read/write over UDP.
Each transaction sends a packet with one or more register operations and
waits for the detector to echo the packet back with results filled in.
"""
DEFAULT_PORT = 42000
def __init__(self, detector_ip: str, port: int = DEFAULT_PORT):
self._socket = UDPSocket(detector_ip, port, recv_buffer_size=4096)
self._lock = threading.Lock()
def read_register(self, address: int) -> int:
"""
Read a single register.
Args:
address: Register address.
Returns:
Register value.
Raises:
RegisterError: On communication failure.
"""
values = self.read_registers([address])
return values[0]
def write_register(self, address: int, value: int) -> bool:
"""
Write a single register.
Args:
address: Register address.
value: Value to write.
Returns:
True on success.
Raises:
RegisterError: On communication failure.
"""
return self.write_registers([(address, value)])
def read_registers(self, addresses: list[int]) -> list[int]:
"""
Read multiple registers in one packet.
Args:
addresses: List of register addresses to read.
Returns:
List of register values (same order as addresses).
Raises:
RegisterError: On communication failure.
"""
entries = [(addr, 0) for addr in addresses]
response = self._transact(entries, write=False)
# Extract values from response
values = []
for i in range(len(addresses)):
offset = 8 + i * 8 + 4 # skip header(8) + addr(4) per entry
val = struct.unpack_from('<I', response, offset)[0]
values.append(val)
return values
def write_registers(self, address_value_pairs: list[tuple[int, int]]) -> bool:
"""
Write multiple registers in one packet.
Args:
address_value_pairs: List of (address, value) tuples.
Returns:
True on success.
Raises:
RegisterError: On communication failure.
"""
self._transact(address_value_pairs, write=True)
return True
def _transact(self, entries: list[tuple[int, int]], write: bool) -> bytes:
"""
Execute a register transaction (send packet, receive echo).
Packet format:
[security_key(u32), reserved(u32), {addr(u32), value(u32)}...]
For writes: addr has bit 31 set (0x80000000).
Minimum 2 register pairs per packet (padded if fewer).
"""
n = len(entries)
# Build request: pad to minimum 2 entries
padded = list(entries)
while len(padded) < 2:
padded.append((0, 0))
# Build packet
parts = [struct.pack('<II', SECURITY_KEY, 0)]
for addr, val in padded:
if write:
parts.append(struct.pack('<II', addr | _WRITE_BIT, val))
else:
parts.append(struct.pack('<II', addr, 0))
packet = b''.join(parts)
with self._lock:
nsent = self._socket.send(packet)
response = self._socket.recv(len(packet), timeout_sec=0.12)
if response is None:
self._socket.clear_buffer()
raise RegisterError("Timeout waiting for register response")
if len(response) != len(packet):
self._socket.clear_buffer()
raise RegisterError(
f"Response length mismatch: expected {len(packet)}, got {len(response)}"
)
# Validate response
self._validate_response(packet, response, entries, write)
return response
def _validate_response(self, request: bytes, response: bytes,
entries: list[tuple[int, int]], write: bool) -> None:
"""
Validate the echoed response against the sent request.
Checks:
1. Security key matches
2. Reserved field matches
3. For each register: address echoed with |0x70000000
4. For writes: value echoed back
"""
# Check security key and reserved field
req_key, req_reserved = struct.unpack_from('<II', request, 0)
resp_key, resp_reserved = struct.unpack_from('<II', response, 0)
if resp_key != req_key:
self._socket.clear_buffer()
raise RegisterError(
f"Security key mismatch: sent 0x{req_key:08x}, got 0x{resp_key:08x}"
)
if resp_reserved != req_reserved:
self._socket.clear_buffer()
raise RegisterError(
f"Reserved field mismatch: sent 0x{req_reserved:08x}, got 0x{resp_reserved:08x}"
)
# Check each register entry
for i in range(len(entries)):
offset = 8 + i * 8
req_addr = struct.unpack_from('<I', request, offset)[0]
req_val = struct.unpack_from('<I', request, offset + 4)[0]
resp_addr = struct.unpack_from('<I', response, offset)[0]
resp_val = struct.unpack_from('<I', response, offset + 4)[0]
# Address: detector echoes with |0x70000000
if (req_addr | _ADDR_ECHO_MASK) != resp_addr:
self._socket.clear_buffer()
raise RegisterError(
f"Register {i}: address mismatch, "
f"sent 0x{req_addr:08x}, expected echo 0x{req_addr | _ADDR_ECHO_MASK:08x}, "
f"got 0x{resp_addr:08x}"
)
# For writes: value must echo back
if write and (req_addr & _WRITE_BIT):
if req_val != resp_val:
self._socket.clear_buffer()
raise RegisterError(
f"Register {i}: write value mismatch, "
f"sent 0x{req_val:08x}, got 0x{resp_val:08x}"
)
def close(self) -> None:
"""Close the register interface."""
self._socket.close()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()

View File

@@ -0,0 +1,128 @@
"""
Low-level UDP socket operations for detector communication.
Ports the socket management from NetworkInterface.cxx (lines 271-402).
"""
import select
import socket
import struct
import logging
logger = logging.getLogger(__name__)
SECURITY_KEY = 0x203b2d29 # " ;-)" — security key and endianness marker
class UDPSocket:
"""Manages a single UDP socket for communication with detector hardware."""
def __init__(self, dest_ip: str, dest_port: int, recv_buffer_size: int = 4096):
"""
Create a UDP socket bound to an OS-assigned local port.
Args:
dest_ip: Destination IP address (detector).
dest_port: Destination UDP port.
recv_buffer_size: Kernel receive buffer size in bytes.
"""
self._dest_addr = (dest_ip, dest_port)
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.bind(('', 0)) # INADDR_ANY, OS-assigned port
# Set receive buffer size
try:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buffer_size)
except OSError as e:
logger.warning("Could not set SO_RCVBUF to %d: %s", recv_buffer_size, e)
actual = self._sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
logger.debug("Receive buffer size: requested=%d, actual=%d", recv_buffer_size, actual)
self._local_port = self._sock.getsockname()[1]
logger.debug("UDP socket bound to local port %d -> %s:%d",
self._local_port, dest_ip, dest_port)
@property
def local_port(self) -> int:
"""The OS-assigned local port number."""
return self._local_port
@property
def fileno(self) -> int:
"""File descriptor for select()."""
return self._sock.fileno()
def send(self, data: bytes | bytearray | memoryview) -> int:
"""
Send data to the configured destination.
Returns:
Number of bytes sent.
Raises:
OSError: On send failure.
"""
nsent = self._sock.sendto(data, self._dest_addr)
if nsent != len(data):
raise OSError(f"Sent {nsent} of {len(data)} bytes")
return nsent
def recv(self, max_size: int, timeout_sec: float = 0.12) -> bytes | None:
"""
Receive data with timeout.
Args:
max_size: Maximum bytes to receive.
timeout_sec: Timeout in seconds (0.12 = 120ms, matching C++ default).
Returns:
Received bytes, or None on timeout.
"""
ready, _, _ = select.select([self._sock], [], [], timeout_sec)
if not ready:
return None
data, _ = self._sock.recvfrom(max_size)
return data
def recv_into(self, buffer: bytearray | memoryview, timeout_sec: float = 0.12) -> int:
"""
Receive data directly into a pre-allocated buffer (zero-copy).
Args:
buffer: Buffer to receive into.
timeout_sec: Timeout in seconds.
Returns:
Number of bytes received, 0 on timeout.
"""
ready, _, _ = select.select([self._sock], [], [], timeout_sec)
if not ready:
return 0
return self._sock.recv_into(buffer)
def clear_buffer(self) -> None:
"""Drain all pending data from the socket (non-blocking)."""
logger.debug("Clearing socket buffer...")
self._sock.setblocking(False)
try:
while True:
try:
self._sock.recv(65536)
except BlockingIOError:
break
finally:
self._sock.setblocking(True)
def close(self) -> None:
"""Close the socket."""
if self._sock:
self._sock.close()
self._sock = None
def __enter__(self):
return self
def __exit__(self, *args):
self.close()

View File

@@ -0,0 +1 @@
from .writer import FileWriter

View File

@@ -0,0 +1,317 @@
"""
File writing from shared memory ring buffer.
Ports KTestFunctions::FileWritingThread from KTestFunctions.cxx (lines 1198-1241).
Supports both raw binary (backward compatible) and HDF5 formats.
"""
import os
import struct
import shutil
import logging
import threading
from pathlib import Path
import numpy as np
from ..detector.chip_config import ChipConfig, TEAM1K_CONFIG
logger = logging.getLogger(__name__)
# Minimum free disk space in MB before stopping writes
_MIN_FREE_SPACE_MB = 256
# Scan number persistence file
_SCAN_NUMBER_FILE = "/usr/local/k_test/parameters/last_scan_number.dat"
class FileWriter:
"""
Reads frames from locking_shmring and writes to disk.
Supports two formats:
- 'raw': Binary files matching the C++ k_test format (4-byte header + pixel data)
- 'hdf5': HDF5 files with chunked datasets and metadata
Runs as a background thread in the main process.
"""
def __init__(self, ring_name: str, chip_config: ChipConfig = TEAM1K_CONFIG):
self._ring_name = ring_name
self._config = chip_config
self._thread: threading.Thread | None = None
self._running = False
# Configuration (can be changed while running)
self._enabled = threading.Event()
self._output_dir = "/data"
self._format = "raw" # or "hdf5"
self._scan_number = self._load_scan_number()
self._lock = threading.Lock()
# Stats
self.frames_written: int = 0
self.bytes_written: int = 0
def _load_scan_number(self) -> int:
"""Load the last scan number from the persistent file."""
try:
with open(_SCAN_NUMBER_FILE, 'rb') as f:
data = f.read(4)
if len(data) == 4:
return struct.unpack('<I', data)[0]
except FileNotFoundError:
pass
except Exception as e:
logger.warning("Could not read scan number file: %s", e)
return 0
def _save_scan_number(self) -> None:
"""Save the current scan number to the persistent file."""
try:
os.makedirs(os.path.dirname(_SCAN_NUMBER_FILE), exist_ok=True)
with open(_SCAN_NUMBER_FILE, 'wb') as f:
f.write(struct.pack('<I', self._scan_number))
except Exception as e:
logger.warning("Could not save scan number file: %s", e)
@property
def scan_number(self) -> int:
return self._scan_number
@property
def output_dir(self) -> str:
return self._output_dir
@property
def file_format(self) -> str:
return self._format
@property
def is_enabled(self) -> bool:
return self._enabled.is_set()
def enable(self, output_dir: str | None = None, file_format: str | None = None) -> None:
"""
Enable file writing.
Args:
output_dir: Output directory (default: keep current).
file_format: File format, 'raw' or 'hdf5' (default: keep current).
"""
with self._lock:
if output_dir is not None:
self._output_dir = output_dir
if file_format is not None:
if file_format not in ('raw', 'hdf5'):
raise ValueError(f"Unsupported format: {file_format}")
self._format = file_format
# Increment scan number
self._scan_number += 1
self._save_scan_number()
# Create scan directory
scan_dir = os.path.join(self._output_dir,
f"scan_{self._scan_number:010d}")
os.makedirs(scan_dir, exist_ok=True)
self._enabled.set()
logger.info("File writing enabled: dir=%s, format=%s, scan=%d",
self._output_dir, self._format, self._scan_number)
def disable(self) -> None:
"""Disable file writing."""
self._enabled.clear()
logger.info("File writing disabled")
def start(self) -> None:
"""Start the writer thread."""
if self._thread and self._thread.is_alive():
return
self._running = True
self._thread = threading.Thread(target=self._writer_loop, daemon=True,
name="team1k-filewriter")
self._thread.start()
logger.info("File writer thread started")
def stop(self) -> None:
"""Stop the writer thread."""
self._running = False
self._enabled.clear()
if self._thread:
self._thread.join(timeout=5.0)
logger.info("File writer thread stopped")
def _check_disk_space(self) -> bool:
"""Check for sufficient free disk space."""
try:
stat = shutil.disk_usage(self._output_dir)
free_mb = stat.free / (1024 * 1024)
if free_mb < _MIN_FREE_SPACE_MB:
logger.error("Disk almost full (%.0f MB free), stopping writes", free_mb)
return False
return True
except Exception as e:
logger.error("Could not check disk space: %s", e)
return False
def _writer_loop(self) -> None:
"""Main writer loop reading from shmring."""
from locking_shmring import RingBufferReader
reader = RingBufferReader(self._ring_name)
logger.info("File writer connected to ring buffer '%s'", self._ring_name)
raw_fp = None
h5_file = None
h5_dataset = None
file_number = 0
transfers_in_file = 0
ntransfers_per_file = self._config.ntransfers_per_file
current_scan = 0
try:
for slot in reader.iter_frames(timeout_ms=200):
if not self._running:
slot.release()
break
with slot:
if not self._enabled.is_set():
# Close any open files
if raw_fp:
raw_fp.close()
raw_fp = None
if h5_file:
h5_file.close()
h5_file = None
h5_dataset = None
file_number = 0
transfers_in_file = 0
continue
# Check if scan number changed (new enable call)
with self._lock:
scan = self._scan_number
fmt = self._format
out_dir = self._output_dir
if scan != current_scan:
# Close old files
if raw_fp:
raw_fp.close()
raw_fp = None
if h5_file:
h5_file.close()
h5_file = None
h5_dataset = None
file_number = 0
transfers_in_file = 0
current_scan = scan
# Check disk space periodically
if transfers_in_file == 0 and not self._check_disk_space():
self._enabled.clear()
continue
frame = slot.data_as_numpy(
(self._config.image_ny, self._config.image_nx), np.uint16
)
if fmt == "raw":
raw_fp, file_number, transfers_in_file = self._write_raw(
frame, slot.frame_id, raw_fp, out_dir, scan,
file_number, transfers_in_file, ntransfers_per_file
)
elif fmt == "hdf5":
h5_file, h5_dataset = self._write_hdf5(
frame, slot.frame_id, slot.timestamp_ns,
h5_file, h5_dataset, out_dir, scan, file_number
)
self.frames_written += 1
self.bytes_written += self._config.image_size_bytes
except Exception as e:
logger.error("File writer error: %s", e)
finally:
if raw_fp:
raw_fp.close()
if h5_file:
h5_file.close()
reader.close()
def _write_raw(self, frame: np.ndarray, frame_id: int,
fp, out_dir: str, scan: int,
file_number: int, transfers_in_file: int,
ntransfers_per_file: int):
"""Write a frame in raw binary format (backward compatible with C++ k_test)."""
if transfers_in_file == 0:
if fp:
fp.close()
filename = os.path.join(
out_dir,
f"scan_{scan:010d}",
f"team1k_ued_{scan:010d}_{file_number:010d}.dat"
)
fp = open(filename, 'wb')
# Write first image number header
fp.write(struct.pack('<I', frame_id))
fp.write(frame.tobytes())
fp.flush()
transfers_in_file += 1
if transfers_in_file >= ntransfers_per_file:
transfers_in_file = 0
file_number += 1
return fp, file_number, transfers_in_file
def _write_hdf5(self, frame: np.ndarray, frame_id: int, timestamp_ns: int,
h5_file, h5_dataset, out_dir: str, scan: int, file_number: int):
"""Write a frame in HDF5 format."""
try:
import h5py
except ImportError:
logger.error("h5py not installed, cannot write HDF5 files")
self._enabled.clear()
return None, None
if h5_file is None:
filename = os.path.join(
out_dir,
f"scan_{scan:010d}",
f"team1k_ued_{scan:010d}_{file_number:010d}.h5"
)
h5_file = h5py.File(filename, 'w')
h5_dataset = h5_file.create_dataset(
'data',
shape=(0, self._config.image_ny, self._config.image_nx),
maxshape=(None, self._config.image_ny, self._config.image_nx),
dtype=np.uint16,
chunks=(1, self._config.image_ny, self._config.image_nx),
)
h5_file.create_dataset('timestamps', shape=(0,), maxshape=(None,),
dtype=np.float64)
h5_file.create_dataset('frame_ids', shape=(0,), maxshape=(None,),
dtype=np.uint64)
h5_file.attrs['scan_number'] = scan
# Append frame
n = h5_dataset.shape[0]
h5_dataset.resize(n + 1, axis=0)
h5_dataset[n] = frame
ts_ds = h5_file['timestamps']
ts_ds.resize(n + 1, axis=0)
ts_ds[n] = timestamp_ns / 1e9
fid_ds = h5_file['frame_ids']
fid_ds.resize(n + 1, axis=0)
fid_ds[n] = frame_id
h5_file.flush()
return h5_file, h5_dataset

View File

@@ -0,0 +1,2 @@
from .bellow_stage import BellowStage
from .power_supply import PowerSupply

View File

@@ -0,0 +1,92 @@
"""
Camera bellow stage control via serial port.
Extracted from the existing server.py.
"""
import time
import logging
import serial
logger = logging.getLogger(__name__)
DEFAULT_PORT = '/dev/CameraBellowStage'
DEFAULT_BAUDRATE = 9600
class BellowStage:
"""Controls the camera bellow stage motor via serial port."""
def __init__(self, port: str = DEFAULT_PORT, baudrate: int = DEFAULT_BAUDRATE):
self._port = port
self._baudrate = baudrate
self.position: int = 0
self.is_moving: bool = False
def move(self, insert: bool) -> tuple[bool, str | None]:
"""
Move the bellow stage.
Args:
insert: True to insert, False to retract.
Returns:
Tuple of (success, error_message).
"""
action = "Inserting" if insert else "Retracting"
logger.info("%s detector", action)
self.is_moving = True
try:
with serial.Serial(self._port, self._baudrate, timeout=1) as ser:
time.sleep(0.5)
ser.write(b'\r')
time.sleep(0.5)
ser.write(b'\x03') # Ctrl-C to stop any ongoing movement
time.sleep(0.5)
ser.write(b'rc=100\r')
time.sleep(0.5)
if insert:
ser.write(b'vm=100000\r')
time.sleep(0.5)
ser.write(b'mr 2000000\r')
else:
ser.write(b'vm=100001\r')
time.sleep(0.5)
ser.write(b'mr -2000000\r')
# Wait for movement
time.sleep(10)
# Read position
ser.write(b'pr p\r')
time.sleep(0.5)
response_bytes = ser.read_all()
response = response_bytes.decode('utf-8', errors='ignore') if response_bytes else ""
for line in response.splitlines():
if line.strip().startswith('p='):
try:
self.position = int(line.strip()[2:])
except ValueError:
pass
break
self.is_moving = False
logger.info("Detector %s successfully (position=%d)",
"inserted" if insert else "retracted", self.position)
return True, None
except serial.SerialException as e:
self.is_moving = False
error = f"Serial port error: {e}"
logger.error(error)
return False, error
except Exception as e:
self.is_moving = False
error = f"Bellow stage error: {e}"
logger.error(error)
return False, error

View File

@@ -0,0 +1,37 @@
"""
Power supply control.
Extracted from the existing server.py.
"""
import logging
logger = logging.getLogger(__name__)
class PowerSupply:
"""Controls the detector power supply."""
def __init__(self):
self.is_on: bool = False
self.voltages: list[float] = []
def turn_on(self) -> tuple[bool, str | None]:
"""Turn on the power supply."""
# TODO: Implement actual power supply control
# The original server.py had placeholder serial port communication
self.is_on = True
logger.info("Power supply turned on")
return True, None
def turn_off(self) -> tuple[bool, str | None]:
"""Turn off the power supply."""
self.is_on = False
logger.info("Power supply turned off")
return True, None
def read_voltages(self) -> tuple[list[float] | None, str | None]:
"""Read voltage values from the power supply."""
# TODO: Implement actual voltage reading
logger.info("Reading power supply voltages")
return self.voltages, None

View File

@@ -0,0 +1,2 @@
from .interface import PVAInterface
from .streamer import PVAStreamer

220
src/team1k/pva/interface.py Normal file
View File

@@ -0,0 +1,220 @@
"""
PVA server setup, command PVs, and status PVs.
Replaces all ZMQ communication (REQ/REP for commands, PUB for status)
with EPICS PV Access channels.
"""
import logging
import threading
from typing import TYPE_CHECKING, Any
import numpy as np
from p4p import Value
from p4p.server import Server, ServerOperation
from p4p.server.thread import SharedPV
from p4p.nt import NTScalar, NTNDArray
if TYPE_CHECKING:
from ..server import Team1kServer
logger = logging.getLogger(__name__)
class _ReadOnlyHandler:
"""Handler for read-only status PVs."""
def put(self, pv: SharedPV, op: ServerOperation):
op.done(error="PV is read-only")
def rpc(self, pv: SharedPV, op: ServerOperation):
op.done(error="RPC not supported")
class _CommandHandler:
"""
Handler for writable command PVs.
When a client puts a value, the handler dispatches to the server.
"""
def __init__(self, server: 'Team1kServer', command_name: str):
self._server = server
self._command = command_name
def put(self, pv: SharedPV, op: ServerOperation):
try:
raw = op.value()
# Extract the actual value from the NTScalar wrapper
value = raw.value if hasattr(raw, 'value') else raw
result = self._server.execute_command(self._command, value)
pv.post(raw)
op.done()
except Exception as e:
logger.error("Command '%s' failed: %s", self._command, e)
op.done(error=str(e))
class PVAInterface:
"""
EPICS PV Access server for commands, status, and data streaming.
PV layout:
Status (read-only):
{prefix}STATUS - string array of status lines
{prefix}ACQUIRING - bool, DAQ running
{prefix}FRAME_RATE - float, current frame rate Hz
{prefix}FRAME_COUNT - int, total frames acquired
{prefix}EXPOSURE_MODE - int, current exposure mode
{prefix}TRIGGER_MODE - int, current trigger mode
{prefix}INTEGRATION_TIME - float, integration time ms
{prefix}FILE_WRITING - bool, file writing enabled
{prefix}OUTPUT_DIR - string, output directory
{prefix}SCAN_NUMBER - int, current scan number
Commands (writable):
{prefix}CMD:EXPOSURE_MODE - int, set exposure mode
{prefix}CMD:TRIGGER_MODE - int, set trigger mode
{prefix}CMD:INTEGRATION_TIME - float, set integration time ms
{prefix}CMD:START_DAQ - int, 1=start, 0=stop
{prefix}CMD:FILE_WRITING - int, 1=enable, 0=disable
{prefix}CMD:OUTPUT_DIR - string, set output directory
{prefix}CMD:FILE_FORMAT - string, "raw" or "hdf5"
{prefix}CMD:ADC_CLOCK_FREQ - float, set ADC clock MHz
{prefix}CMD:ADC_DATA_DELAY - int, set ADC data delay
{prefix}CMD:PARAMETER_FILE - string, apply parameter file
{prefix}CMD:RESET - int, 1=reset detector connection
{prefix}CMD:TEST_MODE - int, 0/1
{prefix}CMD:REGISTER_READ - int, read register at address
{prefix}CMD:REGISTER_WRITE - int array [address, value]
Data:
{prefix}IMAGE - NTNDArray, live image stream
Peripherals:
{prefix}BELLOW:CMD - int, 1=insert, 0=retract
{prefix}BELLOW:POSITION - int, current position
{prefix}POWER:CMD - int, 1=on, 0=off
{prefix}POWER:VOLTAGES - float array, voltage readings
"""
def __init__(self, server: 'Team1kServer', prefix: str = "TEAM1K:"):
self._server = server
self._prefix = prefix
self._pvs: dict[str, SharedPV] = {}
self._pva_server: Server | None = None
self._ro_handler = _ReadOnlyHandler()
@property
def image_pv(self) -> SharedPV:
return self._pvs[f"{self._prefix}IMAGE"]
def _make_status_pv(self, nt, initial) -> SharedPV:
"""Create a read-only status PV."""
pv = SharedPV(handler=self._ro_handler, nt=nt, initial=initial)
return pv
def _make_command_pv(self, nt, initial, command_name: str) -> SharedPV:
"""Create a writable command PV."""
handler = _CommandHandler(self._server, command_name)
pv = SharedPV(handler=handler, nt=nt, initial=initial)
return pv
def setup(self) -> None:
"""Create all PVs and start the PVA server."""
p = self._prefix
# --- Status PVs (read-only) ---
self._pvs[f"{p}STATUS"] = self._make_status_pv(
NTScalar('as'), ['INITIALIZING'])
self._pvs[f"{p}ACQUIRING"] = self._make_status_pv(
NTScalar('?'), False)
self._pvs[f"{p}FRAME_RATE"] = self._make_status_pv(
NTScalar('d'), 0.0)
self._pvs[f"{p}FRAME_COUNT"] = self._make_status_pv(
NTScalar('l'), 0)
self._pvs[f"{p}EXPOSURE_MODE"] = self._make_status_pv(
NTScalar('i'), 0)
self._pvs[f"{p}TRIGGER_MODE"] = self._make_status_pv(
NTScalar('i'), 0)
self._pvs[f"{p}INTEGRATION_TIME"] = self._make_status_pv(
NTScalar('d'), 0.0)
self._pvs[f"{p}FILE_WRITING"] = self._make_status_pv(
NTScalar('?'), False)
self._pvs[f"{p}OUTPUT_DIR"] = self._make_status_pv(
NTScalar('s'), "/data")
self._pvs[f"{p}SCAN_NUMBER"] = self._make_status_pv(
NTScalar('l'), 0)
# --- Command PVs (writable) ---
self._pvs[f"{p}CMD:EXPOSURE_MODE"] = self._make_command_pv(
NTScalar('i'), 0, "set_exposure_mode")
self._pvs[f"{p}CMD:TRIGGER_MODE"] = self._make_command_pv(
NTScalar('i'), 0, "set_trigger_mode")
self._pvs[f"{p}CMD:INTEGRATION_TIME"] = self._make_command_pv(
NTScalar('d'), 0.0, "set_integration_time")
self._pvs[f"{p}CMD:START_DAQ"] = self._make_command_pv(
NTScalar('i'), 0, "start_stop_daq")
self._pvs[f"{p}CMD:FILE_WRITING"] = self._make_command_pv(
NTScalar('i'), 0, "set_file_writing")
self._pvs[f"{p}CMD:OUTPUT_DIR"] = self._make_command_pv(
NTScalar('s'), "", "set_output_dir")
self._pvs[f"{p}CMD:FILE_FORMAT"] = self._make_command_pv(
NTScalar('s'), "raw", "set_file_format")
self._pvs[f"{p}CMD:ADC_CLOCK_FREQ"] = self._make_command_pv(
NTScalar('d'), 60.0, "set_adc_clock_freq")
self._pvs[f"{p}CMD:ADC_DATA_DELAY"] = self._make_command_pv(
NTScalar('i'), 0, "set_adc_data_delay")
self._pvs[f"{p}CMD:PARAMETER_FILE"] = self._make_command_pv(
NTScalar('s'), "", "apply_parameter_file")
self._pvs[f"{p}CMD:RESET"] = self._make_command_pv(
NTScalar('i'), 0, "reset_connection")
self._pvs[f"{p}CMD:TEST_MODE"] = self._make_command_pv(
NTScalar('i'), 0, "set_test_mode")
# --- Data PV ---
self._pvs[f"{p}IMAGE"] = SharedPV(
handler=self._ro_handler,
nt=NTNDArray(),
initial=np.zeros((self._server.chip_config.image_ny,
self._server.chip_config.image_nx), dtype=np.uint16),
)
# --- Peripheral PVs ---
self._pvs[f"{p}BELLOW:CMD"] = self._make_command_pv(
NTScalar('i'), 0, "bellow_move")
self._pvs[f"{p}BELLOW:POSITION"] = self._make_status_pv(
NTScalar('i'), 0)
self._pvs[f"{p}POWER:CMD"] = self._make_command_pv(
NTScalar('i'), 0, "power_control")
self._pvs[f"{p}POWER:VOLTAGES"] = self._make_status_pv(
NTScalar('ad'), np.zeros(0, dtype=np.float64))
# Start the PVA server
self._pva_server = Server(providers=[self._pvs])
logger.info("PVA server started with %d PVs (prefix=%s)",
len(self._pvs), self._prefix)
def post_status(self, name: str, value: Any) -> None:
"""
Update a status PV.
Args:
name: PV name suffix (without prefix), e.g., 'FRAME_RATE'.
value: New value to post.
"""
full_name = f"{self._prefix}{name}"
pv = self._pvs.get(full_name)
if pv:
try:
pv.post(value)
except Exception as e:
logger.debug("Could not post to %s: %s", full_name, e)
def stop(self) -> None:
"""Stop the PVA server."""
if self._pva_server:
self._pva_server.stop()
self._pva_server = None
logger.info("PVA server stopped")

130
src/team1k/pva/streamer.py Normal file
View File

@@ -0,0 +1,130 @@
"""
PVA data streamer — reads frames from shmring and posts to NTNDArray PV.
Runs as a background thread in the main process.
"""
import time
import logging
import threading
import numpy as np
from p4p import Value
from p4p.server.thread import SharedPV
from p4p.nt import NTNDArray
from ..detector.chip_config import ChipConfig, TEAM1K_CONFIG
logger = logging.getLogger(__name__)
class PVAStreamer:
"""
Reads frames from locking_shmring and posts to a PVA NTNDArray channel.
The streamer naturally drops frames if the PVA channel is slower than
acquisition — the ring buffer reader simply skips ahead and reports
frames_skipped.
"""
def __init__(self, ring_name: str, image_pv: SharedPV,
chip_config: ChipConfig = TEAM1K_CONFIG):
self._ring_name = ring_name
self._image_pv = image_pv
self._config = chip_config
self._thread: threading.Thread | None = None
self._running = False
# Stats
self.frame_count: int = 0
self.frames_skipped: int = 0
self.frame_rate: float = 0.0
# Callbacks for updating status PVs
self._frame_count_callback = None
self._frame_rate_callback = None
def set_callbacks(self, frame_count_cb=None, frame_rate_cb=None):
"""Set callbacks for updating external status PVs."""
self._frame_count_callback = frame_count_cb
self._frame_rate_callback = frame_rate_cb
def start(self) -> None:
"""Start the streamer thread."""
if self._thread and self._thread.is_alive():
return
self._running = True
self._thread = threading.Thread(target=self._streamer_loop, daemon=True,
name="team1k-pva-streamer")
self._thread.start()
logger.info("PVA streamer thread started")
def stop(self) -> None:
"""Stop the streamer thread."""
self._running = False
if self._thread:
self._thread.join(timeout=5.0)
logger.info("PVA streamer thread stopped")
def _streamer_loop(self) -> None:
"""Main loop reading from shmring and posting to PVA."""
from locking_shmring import RingBufferReader
reader = RingBufferReader(self._ring_name)
logger.info("PVA streamer connected to ring buffer '%s'", self._ring_name)
last_time = time.monotonic()
frames_since_last = 0
nt = NTNDArray()
try:
for slot in reader.iter_frames(timeout_ms=200):
if not self._running:
slot.release()
break
with slot:
self.frame_count += 1
self.frames_skipped += slot.frames_skipped
frames_since_last += 1
# Get numpy view of frame data
frame = slot.data_as_numpy(
(self._config.image_ny, self._config.image_nx), np.uint16
)
# Build NTNDArray value and post
height, width = frame.shape
frame_array = {
'value': frame.ravel(),
'dimension': [
{'size': height, 'offset': 0, 'fullSize': height,
'binning': 1, 'reverse': False},
{'size': width, 'offset': 0, 'fullSize': width,
'binning': 1, 'reverse': False},
],
}
try:
self._image_pv.post(Value(nt, frame_array))
except Exception as e:
logger.debug("PVA post error (no clients?): %s", e)
# Update frame rate every second
now = time.monotonic()
dt = now - last_time
if dt >= 1.0:
self.frame_rate = frames_since_last / dt
frames_since_last = 0
last_time = now
if self._frame_rate_callback:
self._frame_rate_callback(self.frame_rate)
if self._frame_count_callback:
self._frame_count_callback(self.frame_count)
except Exception as e:
logger.error("PVA streamer error: %s", e)
finally:
reader.close()

File diff suppressed because it is too large Load Diff