From 08135c6c136fb98aae23519c84e39da8634afa79 Mon Sep 17 00:00:00 2001 From: Sebastian Strempfer Date: Fri, 27 Feb 2026 01:14:08 -0600 Subject: [PATCH] Claude v1 --- pyproject.toml | 6 +- src/team1k/Client.py | 833 +++++-------- src/team1k/KTestClient.py | 191 --- src/team1k/__init__.py | 12 +- src/team1k/acquisition/__init__.py | 1 + src/team1k/acquisition/receiver.py | 366 ++++++ src/team1k/detector/__init__.py | 7 + src/team1k/detector/adc.py | 231 ++++ src/team1k/detector/chip_config.py | 88 ++ src/team1k/detector/commands.py | 170 +++ src/team1k/detector/data_port.py | 146 +++ src/team1k/detector/parameter_file.py | 179 +++ src/team1k/detector/registers.py | 217 ++++ src/team1k/detector/udp_transport.py | 128 ++ src/team1k/filewriter/__init__.py | 1 + src/team1k/filewriter/writer.py | 317 +++++ src/team1k/peripherals/__init__.py | 2 + src/team1k/peripherals/bellow_stage.py | 92 ++ src/team1k/peripherals/power_supply.py | 37 + src/team1k/pva/__init__.py | 2 + src/team1k/pva/interface.py | 220 ++++ src/team1k/pva/streamer.py | 130 ++ src/team1k/server.py | 1522 ++++++------------------ 23 files changed, 2989 insertions(+), 1909 deletions(-) delete mode 100644 src/team1k/KTestClient.py create mode 100644 src/team1k/acquisition/__init__.py create mode 100644 src/team1k/acquisition/receiver.py create mode 100644 src/team1k/detector/__init__.py create mode 100644 src/team1k/detector/adc.py create mode 100644 src/team1k/detector/chip_config.py create mode 100644 src/team1k/detector/commands.py create mode 100644 src/team1k/detector/data_port.py create mode 100644 src/team1k/detector/parameter_file.py create mode 100644 src/team1k/detector/registers.py create mode 100644 src/team1k/detector/udp_transport.py create mode 100644 src/team1k/filewriter/__init__.py create mode 100644 src/team1k/filewriter/writer.py create mode 100644 src/team1k/peripherals/__init__.py create mode 100644 src/team1k/peripherals/bellow_stage.py create mode 100644 src/team1k/peripherals/power_supply.py create mode 100644 src/team1k/pva/__init__.py create mode 100644 src/team1k/pva/interface.py create mode 100644 src/team1k/pva/streamer.py diff --git a/pyproject.toml b/pyproject.toml index 9dd4f82..0a29cf8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,10 +14,12 @@ requires-python = ">=3.9" dependencies = [ "numpy >= 1.24.0", "pyserial >= 3.5", - "pydantic >= 2.0.0", - "typing-extensions >= 4.5.0", + "p4p >= 4.1.0", ] +[project.optional-dependencies] +hdf5 = ["h5py >= 3.0.0"] + [project.scripts] team1k-server = "team1k.server:main" diff --git a/src/team1k/Client.py b/src/team1k/Client.py index 8d1ed28..1e8fdf4 100644 --- a/src/team1k/Client.py +++ b/src/team1k/Client.py @@ -1,598 +1,297 @@ #!/usr/bin/env python3 """ -Team1k Client - Interface to the Team1k detector control server. +Team1k Client — PVA-based interface to the Team1k detector server. + +Replaces the previous ZMQ-based client with EPICS PV Access communication. +All commands are sent via pvput to command PVs, and status/data are +received via pvget or monitor subscriptions. """ -import uuid -import time -import socket -import asyncio +import enum +import logging import threading -from typing import Dict, List, Any, Optional, Tuple, Union, Callable +from typing import Any, Callable -import zmq -import zmq.asyncio +import numpy as np -from .KTestClient import KTestClient, KTestCommand, KTestError, ExposureModes, TriggerModes +from p4p.client.thread import Context, Subscription + +logger = logging.getLogger(__name__) -class CommandException(Exception): - """Exception raised when a command fails on the server side.""" +class ExposureModes(enum.IntEnum): + """Detector exposure modes.""" + ROLLING_SHUTTER = 0 + GLOBAL_SHUTTER = 1 + GLOBAL_SHUTTER_FLUSH = 2 + GLOBAL_SHUTTER_CDS = 3 + + +class TriggerModes(enum.IntEnum): + """Detector trigger modes.""" + INTERNAL = 0 + EXTERNAL = 1 + + +class CommandError(Exception): + """Raised when a PVA command put fails.""" pass class Client: """ - Client for the Team1k server. - - Provides methods to send commands and receive status updates. + PVA client for the Team1k detector server. + + All communication uses EPICS PV Access: + - Commands: pvput to {prefix}CMD:* PVs + - Status: pvget from {prefix}* status PVs + - Data: monitor on {prefix}IMAGE PV + + Args: + prefix: PV name prefix (default: "TEAM1K:"). + timeout: Default timeout in seconds for PVA operations. """ - - def __init__(self, host: str = 'localhost', cmd_port: int = 42003, pub_port: int = 42004): + + def __init__(self, prefix: str = "TEAM1K:", timeout: float = 5.0): + self._prefix = prefix + self._timeout = timeout + self._ctx = Context("pva") + self._subscriptions: dict[str, Subscription] = {} + + def _pv(self, name: str) -> str: + """Build full PV name.""" + return f"{self._prefix}{name}" + + def _put(self, pv_suffix: str, value: Any) -> None: + """Put a value to a command PV.""" + pv_name = self._pv(pv_suffix) + try: + self._ctx.put(pv_name, value, timeout=self._timeout) + except Exception as e: + raise CommandError(f"Failed to put {pv_name}={value}: {e}") from e + + def _get(self, pv_suffix: str) -> Any: + """Get the current value of a PV.""" + pv_name = self._pv(pv_suffix) + try: + return self._ctx.get(pv_name, timeout=self._timeout) + except Exception as e: + raise CommandError(f"Failed to get {pv_name}: {e}") from e + + # --- Detector commands --- + + def set_exposure_mode(self, mode: ExposureModes | int) -> None: + """Set the detector exposure mode (0-3).""" + self._put("CMD:EXPOSURE_MODE", int(mode)) + + def set_trigger_mode(self, mode: TriggerModes | int) -> None: + """Set the trigger mode (0=internal, 1=external).""" + self._put("CMD:TRIGGER_MODE", int(mode)) + + def set_integration_time(self, time_ms: float) -> None: + """Set the integration time in milliseconds.""" + self._put("CMD:INTEGRATION_TIME", time_ms) + + def start_daq(self) -> None: + """Start data acquisition.""" + self._put("CMD:START_DAQ", 1) + + def stop_daq(self) -> None: + """Stop data acquisition.""" + self._put("CMD:START_DAQ", 0) + + def set_file_writing(self, enable: bool) -> None: + """Enable or disable file writing.""" + self._put("CMD:FILE_WRITING", 1 if enable else 0) + + def set_output_dir(self, path: str) -> None: + """Set the output directory for file writing.""" + self._put("CMD:OUTPUT_DIR", path) + + def set_file_format(self, fmt: str) -> None: + """Set file format: 'raw' or 'hdf5'.""" + self._put("CMD:FILE_FORMAT", fmt) + + def set_adc_clock_freq(self, freq_mhz: float) -> None: + """Set ADC clock frequency in MHz (50-100).""" + self._put("CMD:ADC_CLOCK_FREQ", freq_mhz) + + def set_adc_data_delay(self, delay: int) -> None: + """Set ADC data delay.""" + self._put("CMD:ADC_DATA_DELAY", delay) + + def load_parameter_file(self, file_path: str) -> None: + """Apply a parameter file on the server.""" + self._put("CMD:PARAMETER_FILE", file_path) + + def reset_connection(self) -> None: + """Reset the detector connection (stop DAQ, reinitialize).""" + self._put("CMD:RESET", 1) + + def set_test_mode(self, enable: bool) -> None: + """Enable or disable FPGA test data generation.""" + self._put("CMD:TEST_MODE", 1 if enable else 0) + + # --- Peripheral commands --- + + def insert_detector(self) -> None: + """Insert the detector (move bellow stage).""" + self._put("BELLOW:CMD", 1) + + def retract_detector(self) -> None: + """Retract the detector (move bellow stage).""" + self._put("BELLOW:CMD", 0) + + def power_on(self) -> None: + """Turn on the detector power supply.""" + self._put("POWER:CMD", 1) + + def power_off(self) -> None: + """Turn off the detector power supply.""" + self._put("POWER:CMD", 0) + + # --- Status getters --- + + def get_status(self) -> list[str]: + """Get server status strings.""" + val = self._get("STATUS") + return list(val.value) if hasattr(val, 'value') else list(val) + + def is_acquiring(self) -> bool: + """Check if DAQ is running.""" + val = self._get("ACQUIRING") + return bool(val.value if hasattr(val, 'value') else val) + + def get_frame_rate(self) -> float: + """Get current frame rate in Hz.""" + val = self._get("FRAME_RATE") + return float(val.value if hasattr(val, 'value') else val) + + def get_frame_count(self) -> int: + """Get total frames acquired.""" + val = self._get("FRAME_COUNT") + return int(val.value if hasattr(val, 'value') else val) + + def get_exposure_mode(self) -> int: + """Get current exposure mode.""" + val = self._get("EXPOSURE_MODE") + return int(val.value if hasattr(val, 'value') else val) + + def get_trigger_mode(self) -> int: + """Get current trigger mode.""" + val = self._get("TRIGGER_MODE") + return int(val.value if hasattr(val, 'value') else val) + + def get_integration_time(self) -> float: + """Get current integration time in ms.""" + val = self._get("INTEGRATION_TIME") + return float(val.value if hasattr(val, 'value') else val) + + def is_file_writing(self) -> bool: + """Check if file writing is enabled.""" + val = self._get("FILE_WRITING") + return bool(val.value if hasattr(val, 'value') else val) + + def get_output_dir(self) -> str: + """Get current output directory.""" + val = self._get("OUTPUT_DIR") + return str(val.value if hasattr(val, 'value') else val) + + def get_scan_number(self) -> int: + """Get current scan number.""" + val = self._get("SCAN_NUMBER") + return int(val.value if hasattr(val, 'value') else val) + + def get_bellow_position(self) -> int: + """Get bellow stage position.""" + val = self._get("BELLOW:POSITION") + return int(val.value if hasattr(val, 'value') else val) + + def get_power_voltages(self) -> np.ndarray: + """Get power supply voltage readings.""" + val = self._get("POWER:VOLTAGES") + return np.asarray(val.value if hasattr(val, 'value') else val) + + # --- Monitors (live subscriptions) --- + + def monitor_image(self, callback: Callable[[np.ndarray, int], None]) -> None: """ - Initialize the Team1k client. - + Subscribe to live image updates. + + The callback receives (image_array, frame_count) for each new frame. + Only one image monitor can be active at a time. + Args: - host: The hostname of the server - cmd_port: The command socket port - pub_port: The status publication port + callback: Function called with (numpy array, frame_count) for each frame. """ - self.host = host - self.cmd_port = cmd_port - self.pub_port = pub_port - - # Set up ZMQ for commands - self.context = zmq.Context() - self.cmd_socket = self.context.socket(zmq.REQ) - self.cmd_socket.connect(f"tcp://{host}:{cmd_port}") - - # Status subscriber - self.sub_socket = None - self._status_thread = None - self._status_callback = None - self._status_running = False - self._last_status = None - - def send_command(self, command: Union[str, KTestCommand], *args) -> Any: - """ - Send a command to the server. - - Args: - command: A string command or KTestCommand enum - *args: Arguments for the command - - Returns: - The server response - - Raises: - CommandException: If the command fails on the server side - Exception: For other errors - """ - # Format command string - if isinstance(command, KTestCommand): - cmd_str = f"{command.value.lower()} {' '.join(str(arg) for arg in args)}" - else: - cmd_str = f"{command} {' '.join(str(arg) for arg in args)}" - - # Send command - self.cmd_socket.send_string(cmd_str.strip()) - - # Get response - response = self.cmd_socket.recv_pyobj() - - # Handle exceptions from the server - if isinstance(response, Exception): - if isinstance(response, (ValueError, RuntimeError)): - raise CommandException(str(response)) - else: - raise response - - return response - - def subscribe_to_status(self, callback: Callable[[Dict[str, Any]], None]) -> None: - """ - Subscribe to status updates. - - Args: - callback: Function to call with status updates - """ - # Stop existing subscription if there is one - self.unsubscribe_from_status() - - # Create a ZMQ SUB socket - sub_socket = self.context.socket(zmq.SUB) - sub_socket.setsockopt_string(zmq.SUBSCRIBE, '') - sub_socket.connect(f"tcp://{self.host}:{self.pub_port}") - - # Set up status thread - self.sub_socket = sub_socket - self._status_callback = callback - self._status_running = True - self._status_thread = threading.Thread(target=self._status_listener, daemon=True) - self._status_thread.start() - - def unsubscribe_from_status(self) -> None: - """Stop receiving status updates.""" - if not self._status_running: - return - - # Stop the status thread - self._status_running = False - if self._status_thread: - self._status_thread.join(timeout=1.0) - - # Close the socket - if self.sub_socket: - self.sub_socket.close() - self.sub_socket = None - - def _status_listener(self) -> None: - """Background thread to listen for status updates.""" - poller = zmq.Poller() - poller.register(self.sub_socket, zmq.POLLIN) - - while self._status_running: + self.stop_monitor("IMAGE") + + pv_name = self._pv("IMAGE") + + def _on_image(value): try: - socks = dict(poller.poll(timeout=1000)) - if self.sub_socket in socks: - status = self.sub_socket.recv_pyobj() - - # Store status - self._last_status = status - - # Call callback - if self._status_callback: - self._status_callback(status) + arr = value.value + # NTNDArray stores shape info in the dimension field + if hasattr(value, 'dimension') and value.dimension: + dims = value.dimension + shape = tuple(d.size for d in dims) + arr = arr.reshape(shape) + callback(arr, 0) except Exception as e: - print(f"Error in status listener: {e}") - - def get_status(self) -> Dict[str, Any]: + logger.debug("Image monitor error: %s", e) + + self._subscriptions["IMAGE"] = self._ctx.monitor(pv_name, _on_image) + + def monitor_status(self, pv_name: str, + callback: Callable[[Any], None]) -> None: """ - Get the current status from the server. - - Returns: - The current system status - """ - if self._last_status is not None: - return self._last_status - - return self.send_command("get_status") - - def insert_detector(self) -> str: - """ - Insert the detector into the beam. - - Returns: - Success message - """ - return self.send_command("insert") - - def retract_detector(self) -> str: - """ - Retract the detector from the beam. - - Returns: - Success message - """ - return self.send_command("retract") - - def power_supply_on(self) -> str: - """ - Turn on the detector power supply. - - Returns: - Success message - """ - return self.send_command("power_on") - - def power_supply_off(self) -> str: - """ - Turn off the detector power supply. - - Returns: - Success message - """ - return self.send_command("power_off") - - def read_voltages(self) -> Dict[str, List[float]]: - """ - Read the power supply voltages. - - Returns: - Dictionary with voltage values - """ - return self.send_command("read_voltages") - - # KTest command convenience methods - - def reset_connection(self) -> Any: - """ - Reset the connection between the k_test server and the detector. - - Returns: - Server response - """ - return self.send_command(KTestCommand.RESET_CONNECTION) - - def start_daq(self) -> Any: - """ - Start the data acquisition process. - - Returns: - Server response - """ - return self.send_command(KTestCommand.RESTART_DAQ) - - def stop_daq(self) -> Any: - """ - Stop the data acquisition process. - - Returns: - Server response - """ - return self.send_command(KTestCommand.STOP_DAQ) - - def set_exposure_mode(self, exposure_mode: ExposureModes) -> Any: - """ - Set the exposure mode for the detector. - + Subscribe to a status PV for live updates. + Args: - exposure_mode: The exposure mode to set - - Returns: - Server response + pv_name: PV suffix (e.g., "FRAME_RATE", "ACQUIRING"). + callback: Function called with the new value on each update. """ - return self.send_command(KTestCommand.SET_EXPOSURE_MODE, exposure_mode) - - def set_trigger_mode(self, trigger_mode: TriggerModes) -> Any: - """ - Set the trigger mode for the detector. - - Args: - trigger_mode: The trigger mode to set - - Returns: - Server response - """ - return self.send_command(KTestCommand.SET_TRIGGER_MODE, trigger_mode) - - def set_integration_time(self, integration_time_ms: float) -> Any: - """ - Set the integration time in milliseconds. - - Args: - integration_time_ms: The integration time in milliseconds - - Returns: - Server response - """ - return self.send_command(KTestCommand.SET_INTEGRATION_TIME, integration_time_ms) - - def load_parameter_file(self, file_path: str) -> Any: - """ - Load a parameter file for the detector. - - Args: - file_path: Path to the parameter file - - Returns: - Server response - """ - return self.send_command(KTestCommand.LOAD_PARAMETER_FILE, file_path) - + self.stop_monitor(pv_name) + + full_name = self._pv(pv_name) + + def _on_update(value): + try: + val = value.value if hasattr(value, 'value') else value + callback(val) + except Exception as e: + logger.debug("Monitor %s error: %s", pv_name, e) + + self._subscriptions[pv_name] = self._ctx.monitor(full_name, _on_update) + + def stop_monitor(self, name: str) -> None: + """Stop a specific monitor subscription.""" + sub = self._subscriptions.pop(name, None) + if sub is not None: + sub.close() + + def stop_all_monitors(self) -> None: + """Stop all active monitor subscriptions.""" + for sub in self._subscriptions.values(): + sub.close() + self._subscriptions.clear() + def close(self) -> None: - """Close the client connections.""" - # Unsubscribe if needed - self.unsubscribe_from_status() - - # Close command socket - if self.cmd_socket: - self.cmd_socket.close() - - # Terminate context - if self.context: - self.context.term() + """Close the client and release resources.""" + self.stop_all_monitors() + self._ctx.close() -class AsyncClient: - """ - Asynchronous client for the Team1k server. - - Provides async methods to send commands and receive status updates. - """ - - def __init__(self, host: str = 'localhost', cmd_port: int = 42003, pub_port: int = 42004): - """ - Initialize the asynchronous Team1k client. - - Args: - host: The hostname of the server - cmd_port: The command socket port - pub_port: The status publication port - """ - self.host = host - self.cmd_port = cmd_port - self.pub_port = pub_port - - # Set up ZMQ for commands - self.context = zmq.asyncio.Context() - self.cmd_socket = self.context.socket(zmq.REQ) - self.cmd_socket.connect(f"tcp://{host}:{cmd_port}") - - # Status subscriber - self.sub_socket = None - self._status_task = None - self._status_callbacks = [] - self._status_running = False - self._last_status = None - - async def send_command(self, command: Union[str, KTestCommand], *args) -> Any: - """ - Send a command to the server asynchronously. - - Args: - command: A string command or KTestCommand enum - *args: Arguments for the command - - Returns: - The server response - - Raises: - CommandException: If the command fails on the server side - Exception: For other errors - """ - # Format command string - if isinstance(command, KTestCommand): - cmd_str = f"{command.value.lower()} {' '.join(str(arg) for arg in args)}" - else: - cmd_str = f"{command} {' '.join(str(arg) for arg in args)}" - - # Send command - await self.cmd_socket.send_string(cmd_str.strip()) - - # Get response - response = await self.cmd_socket.recv_pyobj() - - # Handle exceptions from the server - if isinstance(response, Exception): - if isinstance(response, (ValueError, RuntimeError)): - raise CommandException(str(response)) - else: - raise response - - return response - - async def subscribe_to_status(self, callback: Callable[[Dict[str, Any]], None]) -> None: - """ - Subscribe to status updates asynchronously. - - Args: - callback: Function to call with status updates - """ - # Add callback if we're already subscribed - if self._status_running and self.sub_socket: - self._status_callbacks.append(callback) - return - - # Create a ZMQ SUB socket - sub_socket = self.context.socket(zmq.SUB) - sub_socket.setsockopt_string(zmq.SUBSCRIBE, '') - sub_socket.connect(f"tcp://{self.host}:{self.pub_port}") - - # Set up status task - self.sub_socket = sub_socket - self._status_callbacks = [callback] - self._status_running = True - self._status_task = asyncio.create_task(self._status_listener()) - - async def unsubscribe_from_status(self) -> None: - """Stop receiving status updates asynchronously.""" - if not self._status_running: - return - - # Stop the status task - self._status_running = False - if self._status_task: - self._status_task.cancel() - try: - await self._status_task - except asyncio.CancelledError: - pass - - # Close the socket - if self.sub_socket: - self.sub_socket.close() - self.sub_socket = None - - async def _status_listener(self) -> None: - """Background task to listen for status updates.""" - while self._status_running: - try: - # Receive message - status = await self.sub_socket.recv_pyobj() - - # Store status - self._last_status = status - - # Call callbacks - for callback in self._status_callbacks: - callback(status) - except asyncio.CancelledError: - # Task was cancelled - break - except Exception as e: - print(f"Error in status listener: {e}") - - async def get_status(self) -> Dict[str, Any]: - """ - Get the current status from the server asynchronously. - - Returns: - The current system status - """ - if self._last_status is not None: - return self._last_status - - return await self.send_command("get_status") - - async def insert_detector(self) -> str: - """ - Insert the detector into the beam asynchronously. - - Returns: - Success message - """ - return await self.send_command("insert") - - async def retract_detector(self) -> str: - """ - Retract the detector from the beam asynchronously. - - Returns: - Success message - """ - return await self.send_command("retract") - - async def power_supply_on(self) -> str: - """ - Turn on the detector power supply asynchronously. - - Returns: - Success message - """ - return await self.send_command("power_on") - - async def power_supply_off(self) -> str: - """ - Turn off the detector power supply asynchronously. - - Returns: - Success message - """ - return await self.send_command("power_off") - - async def read_voltages(self) -> Dict[str, List[float]]: - """ - Read the power supply voltages asynchronously. - - Returns: - Dictionary with voltage values - """ - return await self.send_command("read_voltages") - - # KTest command convenience methods - - async def reset_connection(self) -> Any: - """ - Reset the connection between the k_test server and the detector asynchronously. - - Returns: - Server response - """ - return await self.send_command(KTestCommand.RESET_CONNECTION) - - async def start_daq(self) -> Any: - """ - Start the data acquisition process asynchronously. - - Returns: - Server response - """ - return await self.send_command(KTestCommand.RESTART_DAQ) - - async def stop_daq(self) -> Any: - """ - Stop the data acquisition process asynchronously. - - Returns: - Server response - """ - return await self.send_command(KTestCommand.STOP_DAQ) - - async def set_exposure_mode(self, exposure_mode: ExposureModes) -> Any: - """ - Set the exposure mode for the detector asynchronously. - - Args: - exposure_mode: The exposure mode to set - - Returns: - Server response - """ - return await self.send_command(KTestCommand.SET_EXPOSURE_MODE, exposure_mode) - - async def set_trigger_mode(self, trigger_mode: TriggerModes) -> Any: - """ - Set the trigger mode for the detector asynchronously. - - Args: - trigger_mode: The trigger mode to set - - Returns: - Server response - """ - return await self.send_command(KTestCommand.SET_TRIGGER_MODE, trigger_mode) - - async def set_integration_time(self, integration_time_ms: float) -> Any: - """ - Set the integration time in milliseconds asynchronously. - - Args: - integration_time_ms: The integration time in milliseconds - - Returns: - Server response - """ - return await self.send_command(KTestCommand.SET_INTEGRATION_TIME, integration_time_ms) - - async def load_parameter_file(self, file_path: str) -> Any: - """ - Load a parameter file for the detector asynchronously. - - Args: - file_path: Path to the parameter file - - Returns: - Server response - """ - return await self.send_command(KTestCommand.LOAD_PARAMETER_FILE, file_path) - - async def close(self) -> None: - """Close the client connections asynchronously.""" - # Unsubscribe if needed - await self.unsubscribe_from_status() - - # Close command socket - if self.cmd_socket: - self.cmd_socket.close() - - # Terminate context - if self.context: - self.context.term() - # Example usage if __name__ == "__main__": - # Synchronous example + logging.basicConfig(level=logging.INFO) + client = Client() - print("Synchronous client:") try: print("Status:", client.get_status()) - print("Voltages:", client.read_voltages()) - except CommandException as e: - print(f"Command failed: {e}") - client.close() - - # Asynchronous example - async def main(): - client = AsyncClient() - print("\nAsynchronous client:") - try: - status = await client.get_status() - print("Status:", status) - voltages = await client.read_voltages() - print("Voltages:", voltages) - except CommandException as e: - print(f"Command failed: {e}") - await client.close() - - asyncio.run(main()) \ No newline at end of file + print("Acquiring:", client.is_acquiring()) + print("Frame rate:", client.get_frame_rate()) + except CommandError as e: + print(f"Error: {e}") + finally: + client.close() diff --git a/src/team1k/KTestClient.py b/src/team1k/KTestClient.py deleted file mode 100644 index 7d8a384..0000000 --- a/src/team1k/KTestClient.py +++ /dev/null @@ -1,191 +0,0 @@ -import socket -import time -import enum - -class KTestError(Exception): - """Custom exception for KTestClient errors.""" - pass - -class ExposureModes(enum.IntEnum): - """Exposure modes for the detector.""" - - ROLLING_SHUTTER = 0 - """Rolling shutter mode.""" - - ROLLING_SHUTTER_WITH_PAUSE = 1 - """Rolling shutter with pause mode.""" - - GLOBAL_SHUTTER = 2 - """Global shutter mode.""" - - GLOBAL_SHUTTER_CDS = 3 - """Global shutter with Correlated Double Sampling (CDS) using two images.""" - -class TriggerModes(enum.IntEnum): - """Trigger modes for the detector (more implemented but not supported by k_test server).""" - - INTERNAL_TRIGGER = 0 - """Internal trigger mode.""" - - EXTERNAL_TRIGGER = 1 - """External trigger mode.""" - -class KTestCommand(enum.Enum): - RESET_CONNECTION = "resetdetectorconnection" - """Reset the connection from k_test to the detector.""" - - # PRINT_EVERY_NTH = "printeverynth" - # """Set print frequency (requires value)""" - - TEST_MODE = "testmode" - """Enable/disable test mode (0 or 1)""" - - LOAD_PARAMETER_FILE = "parameterfile" - """Load parameter file (requires filename)""" - - READ_REGISTER = "readregister" - """Read register (requires address)""" - - WRITE_REGISTER = "writeregister" - """Write register (requires address and value)""" - - SET_ADC_CLOCK_FREQ = "setadcclockfreq" - """Set ADC clock frequency in MHz (50-100)""" - - SET_OUTPUT_DIR = "setoutputdir" - """Set output directory (requires path)""" - - ENABLE_FILE_WRITING = "enablefilewriting" - """Enable file writing""" - - DISABLE_FILE_WRITING = "disablefilewriting" - """Disable file writing""" - - SET_EXPOSURE_MODE = "setexposuremode" - """Set exposure mode (0-3)""" - - SET_TRIGGER_MODE = "settriggermode" - """Set trigger mode (0-2)""" - - SET_INTEGRATION_TIME = "setintegrationtime" - """Set integration time in ms""" - - SET_ADC_DATA_DELAY = "setadcdatadelay" - """Set ADC data delay""" - - RESTART_DAQ = "restartdaq" - """Restart DAQ""" - - STOP_DAQ = "stopdaq" - """Stop DAQ""" - -KTEST_COMMAND_ARGS = { - KTestCommand.RESET_CONNECTION: (), - # KTestCommand.PRINT_EVERY_NTH: (int,), - KTestCommand.TEST_MODE: (int,), - KTestCommand.LOAD_PARAMETER_FILE: (str,), - KTestCommand.READ_REGISTER: (int,), - KTestCommand.WRITE_REGISTER: (int, int), - KTestCommand.SET_ADC_CLOCK_FREQ: (float,), - KTestCommand.SET_OUTPUT_DIR: (str,), - KTestCommand.ENABLE_FILE_WRITING: (), - KTestCommand.DISABLE_FILE_WRITING: (), - KTestCommand.SET_EXPOSURE_MODE: (ExposureModes,), - KTestCommand.SET_TRIGGER_MODE: (TriggerModes,), - KTestCommand.SET_INTEGRATION_TIME: (float,), - KTestCommand.SET_ADC_DATA_DELAY: (int,), - KTestCommand.RESTART_DAQ: (), - KTestCommand.STOP_DAQ: (), -} - -class KTestClient: - def __init__(self, host='localhost', port=42003): - self.host = host - self.port = port - - def send_command(self, command: KTestCommand, *args) -> str: - """ - Send a command to the KTestServer and return the response. - - Args: - command (KTestCommand): Command enum to send - *args: Arguments for the command - Returns: - str: response_message - """ - if command not in KTEST_COMMAND_ARGS: - raise ValueError(f"Invalid command: {command}") - - expected_args = KTEST_COMMAND_ARGS[command] - if len(args) != len(expected_args): - raise ValueError(f"Invalid number of arguments for {command.value}. Expected {len(expected_args)}, got {len(args)}.") - - # Validate argument types - for i, (arg, expected_type) in enumerate(zip(args, expected_args)): - if not isinstance(arg, expected_type): - raise TypeError(f"Invalid argument type for argument {i+1} of {command.value}. Expected {expected_type.__name__}, got {type(arg).__name__}.") - - # Construct command string - command_str = command.value - if args: - command_str += ' ' + ' '.join(str(arg) for arg in args) - - ret_code, ret_msg = self.send_command_str(command_str) - if ret_code != 0: - raise KTestError(f"Command '{command_str}' failed with code {ret_code}: {ret_msg}") - return ret_msg - - def send_command_str(self, command) -> tuple[int, str]: - """ - Send a command to the KTestServer and return the response. - - Args: - command (str): Command string to send - - Returns: - tuple: (return_code, response_message) - """ - sock = None - try: - # Create TCP socket - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((self.host, self.port)) - - # Send command - sock.send(command.encode('utf-8')) - - # Receive response - response = b"" - while True: - chunk = sock.recv(4096) - if not chunk: - break - response += chunk - - sock.close() - - # Parse response - return code is at the beginning - response_str = response.decode('utf-8') - lines = response_str.strip().split('\n') - - # Extract return code (first number in response) - return_code = None - for line in lines: - if line.strip().isdigit() or (line.strip().startswith('-') and line.strip()[1:].isdigit()): - return_code = int(line.strip()) - break - - if return_code is None: - raise KTestError(f"Invalid response from server: {response_str}") - - return return_code, response_str - except ConnectionRefusedError: - raise ConnectionRefusedError("Connection error: Server is not running or unreachable. Start the k_test server first.") - except Exception as e: - raise e - finally: - try: - if sock is not None: - sock.close() - except: - pass diff --git a/src/team1k/__init__.py b/src/team1k/__init__.py index 8f81f07..03f35f8 100644 --- a/src/team1k/__init__.py +++ b/src/team1k/__init__.py @@ -1,8 +1,8 @@ -from .Client import Client, AsyncClient -from .KTestClient import KTestCommand, KTestError, ExposureModes, TriggerModes +from .Client import Client, ExposureModes, TriggerModes, CommandError __all__ = [ - 'Client', 'AsyncClient', - 'KTestCommand', 'KTestError', - 'ExposureModes', 'TriggerModes' -] \ No newline at end of file + 'Client', + 'ExposureModes', + 'TriggerModes', + 'CommandError', +] diff --git a/src/team1k/acquisition/__init__.py b/src/team1k/acquisition/__init__.py new file mode 100644 index 0000000..10ca3e5 --- /dev/null +++ b/src/team1k/acquisition/__init__.py @@ -0,0 +1 @@ +from .receiver import AcquisitionProcess diff --git a/src/team1k/acquisition/receiver.py b/src/team1k/acquisition/receiver.py new file mode 100644 index 0000000..52f351a --- /dev/null +++ b/src/team1k/acquisition/receiver.py @@ -0,0 +1,366 @@ +""" +Dedicated subprocess for receiving detector UDP data at high throughput. + +Ports MainDAQThread + GetImages from KTestFunctions.cxx (lines 868-1047). +Runs as a multiprocessing.Process, communicates via locking_shmring and a pipe. +""" + +import struct +import time +import logging +import multiprocessing +from multiprocessing.connection import Connection + +import numpy as np + +from ..detector.chip_config import ChipConfig, TEAM1K_CONFIG + +logger = logging.getLogger(__name__) + +# Commands sent over the pipe from the main process +CMD_START = "start" +CMD_STOP = "stop" +CMD_SHUTDOWN = "shutdown" + + +class AcquisitionProcess: + """ + Manages a dedicated subprocess for UDP data acquisition. + + The subprocess receives raw UDP packets from the detector, assembles them + into complete frames, and writes them to a shared memory ring buffer + (locking_shmring). The main process reads from the ring buffer for PVA + streaming and file writing. + """ + + def __init__(self, detector_ip: str, data_port: int = 41000, + ring_name: str = "team1k_frames", num_ring_slots: int = 32, + chip_config: ChipConfig = TEAM1K_CONFIG): + self._detector_ip = detector_ip + self._data_port = data_port + self._ring_name = ring_name + self._num_ring_slots = num_ring_slots + self._config = chip_config + self._process: multiprocessing.Process | None = None + self._parent_conn: Connection | None = None + + @property + def ring_name(self) -> str: + return self._ring_name + + @property + def is_alive(self) -> bool: + return self._process is not None and self._process.is_alive() + + def start_process(self) -> None: + """Start the acquisition subprocess.""" + if self.is_alive: + logger.warning("Acquisition process already running") + return + + parent_conn, child_conn = multiprocessing.Pipe() + self._parent_conn = parent_conn + + self._process = multiprocessing.Process( + target=_acquisition_main, + args=( + self._detector_ip, + self._data_port, + self._ring_name, + self._num_ring_slots, + self._config, + child_conn, + ), + daemon=True, + name="team1k-acquisition", + ) + self._process.start() + child_conn.close() # Parent doesn't need the child end + logger.info("Acquisition subprocess started (PID %d)", self._process.pid) + + def start_acquisition(self) -> None: + """Tell the subprocess to start acquiring data.""" + if self._parent_conn: + self._parent_conn.send(CMD_START) + + def stop_acquisition(self) -> None: + """Tell the subprocess to stop acquiring data (but keep running).""" + if self._parent_conn: + self._parent_conn.send(CMD_STOP) + + def shutdown(self, timeout: float = 5.0) -> None: + """Shut down the subprocess completely.""" + if self._parent_conn: + self._parent_conn.send(CMD_SHUTDOWN) + self._parent_conn.close() + self._parent_conn = None + + if self._process and self._process.is_alive(): + self._process.join(timeout=timeout) + if self._process.is_alive(): + logger.warning("Acquisition process did not exit, terminating") + self._process.terminate() + self._process.join(timeout=2.0) + + self._process = None + logger.info("Acquisition subprocess shut down") + + +def _acquisition_main(detector_ip: str, data_port: int, + ring_name: str, num_slots: int, + config: ChipConfig, cmd_pipe: Connection) -> None: + """ + Main function running in the acquisition subprocess. + + This must not import asyncio or anything that would interfere with the + tight recv loop. + """ + # Import here to avoid importing in the main process unnecessarily + import socket as _socket + import select as _select + from locking_shmring import RingBufferWriter + + logging.basicConfig(level=logging.INFO) + proc_logger = logging.getLogger("team1k.acquisition") + + # Create shared memory ring buffer + writer = RingBufferWriter.create_numpy( + ring_name, num_slots, (config.image_nx, config.image_ny), np.uint16 + ) + proc_logger.info("Ring buffer '%s' created: %d slots of %dx%d uint16", + ring_name, num_slots, config.image_nx, config.image_ny) + + # Create UDP data socket + sock = _socket.socket(_socket.AF_INET, _socket.SOCK_DGRAM) + sock.bind(('', 0)) + + # Try to set large socket buffer + for buf_size in [400 * 1024 * 1024, 5 * 1024 * 1024]: + try: + sock.setsockopt(_socket.SOL_SOCKET, _socket.SO_RCVBUF, buf_size) + actual = sock.getsockopt(_socket.SOL_SOCKET, _socket.SO_RCVBUF) + if actual >= buf_size // 2: + proc_logger.info("Socket buffer set to %d bytes (actual: %d)", buf_size, actual) + break + except OSError: + continue + + local_port = sock.getsockname()[1] + dest_addr = (detector_ip, data_port) + + # Perform loopback to register with detector + from ..detector.udp_transport import SECURITY_KEY + _perform_loopback(sock, dest_addr, SECURITY_KEY, proc_logger) + + # Pre-allocate packet buffer + packet_buf = bytearray(config.bytes_per_packet) + header_view = memoryview(packet_buf)[:config.packet_header_size] + + acquiring = False + running = True + frame_count = 0 + + proc_logger.info("Acquisition subprocess ready (local port %d)", local_port) + + while running: + # Check for commands (non-blocking) + if cmd_pipe.poll(0.1 if not acquiring else 0): + try: + cmd = cmd_pipe.recv() + except EOFError: + break + + if cmd == CMD_START: + acquiring = True + proc_logger.info("Acquisition started") + # Clear any stale data in socket + _clear_socket(sock) + elif cmd == CMD_STOP: + acquiring = False + proc_logger.info("Acquisition stopped (frame_count=%d)", frame_count) + elif cmd == CMD_SHUTDOWN: + running = False + continue + + if not acquiring: + continue + + # Acquire one complete frame + slot = writer.start_write(timeout_ms=100) + if slot is None: + proc_logger.warning("Ring buffer full, dropping frame") + # Drain one frame's worth of packets to stay in sync + _skip_frame(sock, config) + continue + + frame_arr = slot.data_as_numpy((config.image_ny, config.image_nx), np.uint16) + frame_arr_flat = frame_arr.ravel() + + frame_id, ok = _assemble_frame( + sock, packet_buf, frame_arr_flat, config, proc_logger + ) + + if ok: + slot.commit(frame_id=frame_count, timestamp_ns=time.time_ns()) + frame_count += 1 + else: + slot.abort() + + # Clean shutdown + writer.signal_shutdown() + writer.close() + sock.close() + proc_logger.info("Acquisition subprocess exiting (total frames: %d)", frame_count) + + +def _perform_loopback(sock, dest_addr, security_key, log) -> bool: + """Perform loopback test to register with detector data port.""" + import struct as _struct + + nwords = 20 + parts = [_struct.pack(' None: + """Drain all pending data from socket.""" + sock.setblocking(False) + try: + while True: + try: + sock.recv(65536) + except BlockingIOError: + break + finally: + sock.setblocking(True) + + +def _skip_frame(sock, config: ChipConfig) -> None: + """Drain packets for one frame to stay in sync.""" + sock.setblocking(False) + try: + for _ in range(config.npackets_per_image): + try: + sock.recv(config.bytes_per_packet) + except BlockingIOError: + break + finally: + sock.setblocking(True) + + +def _assemble_frame(sock, packet_buf: bytearray, frame_flat: np.ndarray, + config: ChipConfig, log) -> tuple[int, bool]: + """ + Assemble one complete frame from UDP packets. + + Ported from KTestFunctions::GetImages (lines 974-1047). + + Each packet: 8-byte header + pixel data + Header format: [frame_number(u32, stored as 2xu16), reserved(u16), packet_number(u16)] + + Args: + sock: UDP socket to receive from. + packet_buf: Pre-allocated buffer for one packet. + frame_flat: Flat numpy view of the frame to write into. + config: Chip configuration. + log: Logger. + + Returns: + Tuple of (frame_number, success). + """ + import select as _select + + npackets = config.npackets_per_image + npixels_per_pkt = config.npixels_per_packet + expected_size = config.bytes_per_packet + header_size = config.packet_header_size + + expected_packet_num = 0 + last_frame_number = 0 + realigning = False + frame_number = 0 + + i = 0 + while i < npackets: + # Receive a packet + while True: + ready, _, _ = _select.select([sock], [], [], 0.12) + if not ready: + continue + + nbytes = sock.recv_into(packet_buf) + + if nbytes == 0: + continue + elif nbytes != expected_size: + log.error("Wrong packet size: expected %d, got %d", expected_size, nbytes) + return 0, False + + break + + # Parse header + frame_number = struct.unpack_from(' tuple[bool, list[int]]: + """ + Write I2C data via FPGA register interface. + + Ported from KTestFunctions::WriteADCI2C (lines 131-175). + + The FPGA has an I2C controller accessible through registers 90 (write) + and 91 (read). Data is written to a 128-entry RAM, then execution is + triggered. Results are read back from the same RAM. + + Args: + tx_data: List of I2C command bytes to send (max 123 entries). + print_info: Whether to log detailed I2C transaction info. + + Returns: + Tuple of (success, rx_data_list). + """ + n = len(tx_data) + if n > 123: + logger.error("I2C write list too long: %d (max 123)", n) + return False, [] + + # Write the command data to FPGA I2C RAM (128 entries) + for i in range(128): + value = tx_data[i] if i < n else (0x100 | CMD_NOP) + if i < n and print_info: + logger.debug("I2C TX %d) 0x%x", i, tx_data[i]) + # Write sequence: clear, set write-enable, clear + self._reg.write_register(90, (i << 16) | value) + self._reg.write_register(90, 0x40000000 | (i << 16) | value) + self._reg.write_register(90, (i << 16) | value) + + # Trigger I2C execution (write n entries) + self._reg.write_register(90, (n << 16)) + self._reg.write_register(90, 0x80000000 | (n << 16)) + self._reg.write_register(90, (n << 16)) + + # Wait for I2C transaction to complete + time.sleep(1.0) + + # Read back results + ok = True + rx_data = [] + for i in range(n): + self._reg.write_register(90, (i << 16)) + self._reg.write_register(90, (i << 16)) + value = self._reg.read_register(91) + rx_data.append(value & 0x1FF) + if print_info: + logger.debug("I2C RX %d) tx=0x%x rx=0x%x", i, tx_data[i], value) + # Check for error: bit 8 set means NACK (except for CMD_RD_NP) + if tx_data[i] != (0x100 | CMD_RD_NP) and (value & 0x100) != 0: + logger.error("I2C error at %d: tx=0x%x rx=0x%x", i, tx_data[i], value) + ok = False + + return ok, rx_data + + def set_clock_freq(self, freq_mhz: float, print_info: bool = True) -> bool: + """ + Set ADC clock frequency via Si570 I2C programming. + + Ported from KTestFunctions::SetADCClockFreq (lines 186-336). + + The Si570 is a programmable oscillator controlled via I2C. + Algorithm: + 1. Find best HS divider and N1 values for target frequency + 2. Compute new RFREQ value from fdco/fxtal + 3. Build I2C sequence: freeze DCO -> write registers -> unfreeze -> apply + + Args: + freq_mhz: Target frequency in MHz (50-100). + print_info: Whether to log computation details. + + Returns: + True on success. + """ + if freq_mhz < 50 or freq_mhz > 100: + logger.error("Frequency out of range: %.1f MHz (must be 50-100)", freq_mhz) + return False + + # Find best HS divider and N1 combination + best_diff = None + best_hs_index = 0 + best_hs_div = 4 + best_n1 = 2 + + for i in range(8): + if _HS_VALUES[i] is None: # skip invalid indices 4 and 6 + continue + hs = _HS_VALUES[i] + # C++ formula: (int(fdco/freq/hs+1)/2*2) & 0x7f + n_temp = (int(_FDCO_TARGET / freq_mhz / hs + 1) // 2 * 2) & 0x7F + diff = abs(_FDCO_TARGET - freq_mhz * hs * n_temp) + if best_diff is None or diff <= best_diff: + best_hs_index = i + best_hs_div = hs + best_n1 = n_temp + best_diff = diff + + fdco = freq_mhz * best_hs_div * best_n1 + fxtal = _FXTAL + + # Compute new RFREQ register value + new_value = int(fdco / fxtal * (2 ** 28)) + + if print_info: + logger.info("Setting ADC clock to %.1f MHz", freq_mhz) + logger.info(" fdco=%.1f MHz, hs_div=%d, n1=%d", + fdco, best_hs_div, best_n1) + logger.info(" RFREQ register: 0x%x", new_value) + + # Build I2C command sequence + i2c_addr_write = _I2C_ADDRESS << 1 + + # NOP padding at the start + seq = [ + 0x100 | CMD_NOP, # NOP + 0x100 | CMD_NOP, # NOP + 0x100 | CMD_NOP, # NOP + 0x100 | CMD_NOP, # NOP + ] + + # Phase 1: Freeze DCO (register 137 = 0x10) + seq += [ + 0x100 | CMD_NOP, # NOP + 0x100 | CMD_START, # START + i2c_addr_write, # Si570 address, write + 137, # DCO register address + 0x10, # Freeze DCO + 0x100 | CMD_STOP, # STOP + 0x100 | CMD_NOP, # NOP + 0x100 | CMD_NOP, # NOP + ] + + # Phase 2: Write new HS_DIV, N1, and RFREQ (registers 7-12) + reg7 = ((best_hs_index & 0x7) << 5) | (((best_n1 - 1) >> 2) & 0x1F) + reg8 = (((best_n1 - 1) & 0x3) << 6) | ((new_value >> 32) & 0x3F) + reg9 = (new_value >> 24) & 0xFF + reg10 = (new_value >> 16) & 0xFF + reg11 = (new_value >> 8) & 0xFF + reg12 = new_value & 0xFF + + seq += [ + 0x100 | CMD_NOP, # NOP + 0x100 | CMD_START, # START + i2c_addr_write, # Si570 address, write + 7, # Start register address + reg7, # HS_DIV[2:0] | N1[6:2] + reg8, # N1[1:0] | RFREQ[37:32] + reg9, # RFREQ[31:24] + reg10, # RFREQ[23:16] + reg11, # RFREQ[15:8] + reg12, # RFREQ[7:0] + 0x100 | CMD_STOP, # STOP + 0x100 | CMD_NOP, # NOP + 0x100 | CMD_NOP, # NOP + ] + + # Phase 3: Unfreeze DCO (register 137 = 0x00) + seq += [ + 0x100 | CMD_NOP, # NOP + 0x100 | CMD_START, # START + i2c_addr_write, # Si570 address, write + 137, # DCO register address + 0x00, # Unfreeze DCO + 0x100 | CMD_STOP, # STOP + 0x100 | CMD_NOP, # NOP + 0x100 | CMD_NOP, # NOP + ] + + # Phase 4: Apply new frequency (register 135 = 0x40) + seq += [ + 0x100 | CMD_NOP, # NOP + 0x100 | CMD_START, # START + i2c_addr_write, # Si570 address, write + 135, # NewFreq register + 0x40, # Apply + 0x100 | CMD_STOP, # STOP + 0x100 | CMD_NOP, # NOP + 0x100 | CMD_NOP, # NOP + 0x100 | CMD_NOP, # NOP + ] + + # Execute the I2C sequence + ok, _ = self.write_adc_i2c(seq, print_info=print_info) + + if ok: + self.current_freq_mhz = freq_mhz + logger.info("ADC clock frequency set to %.1f MHz", freq_mhz) + else: + logger.error("Failed to set ADC clock frequency") + + return ok diff --git a/src/team1k/detector/chip_config.py b/src/team1k/detector/chip_config.py new file mode 100644 index 0000000..d629fd8 --- /dev/null +++ b/src/team1k/detector/chip_config.py @@ -0,0 +1,88 @@ +""" +Chip configuration constants and register setup. + +Ports KTestFunctions::SetChipType("team1k") from KTestFunctions.cxx (lines 527-545). +""" + +import dataclasses +import logging + +from .registers import RegisterInterface + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass(frozen=True) +class ChipConfig: + """Immutable chip configuration.""" + + image_nx: int = 1024 + image_ny: int = 1024 + nimages_per_buffer: int = 16 + n64words_per_packet: int = 1024 # 4 rows per packet + npixels_per_packet: int = 4096 # 4 * n64words_per_packet + npackets_per_image: int = 256 # image_ny / 4 + bytes_per_pixel: int = 2 # uint16 + packet_header_size: int = 8 # 8-byte header per UDP packet + + # Register values for chip initialization (from SetChipType) + reg_ftc_rst_mode: int = 0 # register 21 + reg_preburner: int = 7 << 16 # register 25 + reg_pixels_rows: int = (64 << 16) | 1024 # register 26: 64 cols/adc, 1024 rows + reg_packets_words: int = (255 << 16) | 1023 # register 40: npackets-1, n64words-1 + + @property + def bytes_per_packet(self) -> int: + """Total bytes per UDP data packet (header + pixel data).""" + return self.packet_header_size + self.npixels_per_packet * self.bytes_per_pixel + + @property + def image_size_bytes(self) -> int: + """Bytes per image frame.""" + return self.image_nx * self.image_ny * self.bytes_per_pixel + + @property + def buffer_size_bytes(self) -> int: + """Bytes per multi-image buffer.""" + return self.image_size_bytes * self.nimages_per_buffer + + @property + def npixels_per_buffer(self) -> int: + return self.image_nx * self.image_ny * self.nimages_per_buffer + + @property + def ntransfers_per_file(self) -> int: + """Number of buffer transfers per ~2 GB file.""" + return (2 * 1024 * 1024 * 1024) // (self.bytes_per_pixel * self.npixels_per_buffer) + + +# Default team1k configuration +TEAM1K_CONFIG = ChipConfig() + + +def configure_chip(registers: RegisterInterface, + config: ChipConfig = TEAM1K_CONFIG) -> bool: + """ + Write chip type registers to the detector. + + Args: + registers: RegisterInterface for communicating with the detector. + config: Chip configuration to apply. + + Returns: + True on success. + """ + logger.info("Configuring chip: %dx%d, %d packets/image", + config.image_nx, config.image_ny, config.npackets_per_image) + + ok = registers.write_register(21, config.reg_ftc_rst_mode) + ok &= registers.write_register(25, config.reg_preburner) + ok &= registers.write_register(26, config.reg_pixels_rows) + ok &= registers.write_register(40, config.reg_packets_words) + + if ok: + logger.info("Chip configured successfully") + else: + logger.error("Chip configuration failed") + + return ok diff --git a/src/team1k/detector/commands.py b/src/team1k/detector/commands.py new file mode 100644 index 0000000..861b959 --- /dev/null +++ b/src/team1k/detector/commands.py @@ -0,0 +1,170 @@ +""" +High-level detector commands. + +Ports the command methods from KTestFunctions.cxx: +SetExposureMode, SetTriggerMode, SetIntegrationTime, SetADCDataDelay, +SetADCDataAveraging, EnableFPGATestData, StartNStopDataFlow, etc. +""" + +import time +import math +import logging + +from .registers import RegisterInterface + +logger = logging.getLogger(__name__) + + +class DetectorCommands: + """High-level detector control operations via register writes.""" + + def __init__(self, registers: RegisterInterface): + self._reg = registers + self._exposure_mode: int = 0 + + @property + def exposure_mode(self) -> int: + return self._exposure_mode + + def set_exposure_mode(self, mode: int) -> bool: + """ + Set exposure mode. + + Args: + mode: 0=rolling shutter, 1=rolling shutter with pause, + 2=global shutter, 3=global shutter CDS (double image). + + Writes register 22. + """ + self._exposure_mode = mode + ok = self._reg.write_register(22, mode & 0x3) + if ok: + logger.info("Set exposure mode to %d", mode) + else: + logger.error("Failed to set exposure mode to %d", mode) + return ok + + def set_trigger_mode(self, external: bool, polarity: bool = False, + window_trigger: bool = False) -> bool: + """ + Set trigger mode. + + Args: + external: True for external trigger, False for internal. + polarity: Trigger edge polarity (True=falling, False=rising). + window_trigger: True for external trigger window mode. + + Writes register 23. + """ + value = (1 if external else 0) + value |= (2 if window_trigger else 0) + value |= (4 if polarity else 0) + ok = self._reg.write_register(23, value) + if ok: + mode_str = "internal" + if external and window_trigger: + mode_str = "external window" + elif external: + mode_str = f"external {'falling' if polarity else 'rising'}-edge" + logger.info("Set trigger mode to %s", mode_str) + return ok + + def set_integration_time(self, time_ms: float) -> bool: + """ + Set integration time. + + Args: + time_ms: Integration time in milliseconds. + Converted to 20us steps internally. + + Writes register 24. + """ + value = int(math.floor((time_ms + 10e-6) / 20e-6)) if time_ms > 0 else 0 + ok = self._reg.write_register(24, value) + if ok: + logger.info("Set integration time to %.3f ms (register value: %d)", time_ms, value) + return ok + + def set_adc_data_delay(self, delay: int) -> bool: + """ + Set ADC data delay for all 4 ADCs. + + Args: + delay: Delay value. + + Writes registers 32-35. + """ + ok = True + for i in range(4): + ok &= self._reg.write_register(32 + i, delay) + if ok: + logger.info("Set ADC data delay to 0x%x", delay) + return ok + + def set_adc_data_averaging(self, power_of_2: int) -> bool: + """ + Set ADC data averaging. + + Args: + power_of_2: Averaging factor as power of 2 (0-7). + + Writes register 36. + """ + ok = self._reg.write_register(36, power_of_2 & 0x7) + if ok: + logger.info("Set ADC data averaging to 2^%d", power_of_2) + return ok + + def enable_fpga_test_data(self, enable: bool = True) -> bool: + """ + Enable or disable FPGA test data generation. + + Writes register 37. + """ + ok = self._reg.write_register(37, 1 if enable else 0) + logger.info("FPGA test data %s", "enabled" if enable else "disabled") + return ok + + def start_data_flow(self) -> bool: + """ + Start data acquisition. + + Resets frame counter and timestamp (register 20 = 0xC0000000), + then enables DAQ (register 20 = 0x00000001). + """ + ok = self._reg.write_register(20, 0xC0000000) # reset + time.sleep(0.00001) # 10 us + ok |= self._reg.write_register(20, 0x00000001) # enable + if ok: + logger.info("Data flow started") + return ok + + def stop_data_flow(self) -> bool: + """ + Stop data acquisition. + + Resets frame counter and timestamp (register 20 = 0xC0000000). + """ + ok = self._reg.write_register(20, 0xC0000000) + if ok: + logger.info("Data flow stopped") + return ok + + def read_firmware_version(self) -> int: + """Read firmware version from register 0xFFFF.""" + return self._reg.read_register(0xFFFF) + + def write_adc_spi(self, value: int) -> tuple[bool, int]: + """ + Write ADC SPI value and read back. + + Writes register 80, waits 100ms, reads register 81. + + Returns: + Tuple of (success, return_value). + """ + ok = self._reg.write_register(80, value) + time.sleep(0.1) + return_value = self._reg.read_register(81) + logger.info("ADC SPI write: 0x%x, read: 0x%x", value, return_value) + return ok, return_value diff --git a/src/team1k/detector/data_port.py b/src/team1k/detector/data_port.py new file mode 100644 index 0000000..6887017 --- /dev/null +++ b/src/team1k/detector/data_port.py @@ -0,0 +1,146 @@ +""" +Data port setup and packet reception. + +Ports OpenDataPort, SendDataPortLoopback, and GetDataPortData +from NetworkInterface.cxx (lines 162-268). +""" + +import struct +import logging + +from .udp_transport import UDPSocket, SECURITY_KEY + +logger = logging.getLogger(__name__) + +# Default buffer sizes +_LARGE_BUFFER = 400 * 1024 * 1024 # 400 MB +_SMALL_BUFFER = 5 * 1024 * 1024 # 5 MB fallback + +# Loopback test constants +_LOOPBACK_NWORDS = 20 +_LOOPBACK_PACKET_SIZE = (2 * _LOOPBACK_NWORDS + 2) * 4 # 168 bytes + + +class DataPortError(Exception): + """Error during data port operations.""" + pass + + +class DataPort: + """ + Manages the data channel (UDP) to the detector. + + The data port receives high-throughput image data as UDP packets. + After setup, a loopback test registers the client with the detector. + """ + + DEFAULT_PORT = 41000 + + def __init__(self, detector_ip: str, port: int = DEFAULT_PORT, + dst_table_offset: int = 0, + buffer_size: int = _LARGE_BUFFER): + """ + Open a data port and perform the loopback registration. + + Args: + detector_ip: Detector IP address. + port: Detector data port (default 41000). + dst_table_offset: Destination table offset for detector routing. + buffer_size: Kernel receive buffer size (400 MB default, falls back to 5 MB). + """ + # Try large buffer first, fall back to smaller + try: + self._socket = UDPSocket(detector_ip, port, recv_buffer_size=buffer_size) + actual = self._socket._sock.getsockopt( + __import__('socket').SOL_SOCKET, __import__('socket').SO_RCVBUF + ) + # Linux doubles the requested value; check if we got at least half + if actual < buffer_size // 2: + raise OSError(f"Buffer too small: {actual}") + except OSError: + logger.warning("Could not set socket buffer to %d bytes, trying %d", + buffer_size, _SMALL_BUFFER) + self._socket = UDPSocket(detector_ip, port, recv_buffer_size=_SMALL_BUFFER) + + # Pre-allocate recv buffer for the maximum packet size + self._recv_buf = bytearray(65536) + + # Perform loopback to register with detector + if not self._perform_loopback(dst_table_offset): + self._socket.close() + raise DataPortError("Data port loopback test failed") + + logger.info("Data port opened on local port %d", self._socket.local_port) + + def _perform_loopback(self, dst_table_offset: int) -> bool: + """ + Send a loopback packet to register this client with the detector. + + The detector echoes the packet back to confirm the connection. + + Format: [security_key(u32), dst_table_offset(u32), data_words(20 x u32)...] + Total: (2*20+2) * 4 = 168 bytes + """ + # Build loopback packet + parts = [struct.pack(' int: + """ + Receive a single data packet into the provided buffer. + + The buffer should be large enough for header + data (e.g., 8200 bytes + for team1k: 8 header + 8192 pixel data). + + Args: + buffer: Pre-allocated buffer to receive into. + timeout_sec: Timeout in seconds (0.12 = 120ms default). + + Returns: + Number of bytes received, 0 on timeout. + """ + return self._socket.recv_into(buffer, timeout_sec=timeout_sec) + + def clear(self) -> None: + """Drain all buffered data packets.""" + self._socket.clear_buffer() + + def close(self) -> None: + """Close the data port.""" + self._socket.close() + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() diff --git a/src/team1k/detector/parameter_file.py b/src/team1k/detector/parameter_file.py new file mode 100644 index 0000000..b9c1c19 --- /dev/null +++ b/src/team1k/detector/parameter_file.py @@ -0,0 +1,179 @@ +""" +Parameter file parser. + +Ports KTestFunctions::ReadParameterFile from KTestFunctions.cxx (lines 1078-1195). +""" + +import logging +from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .registers import RegisterInterface + from .commands import DetectorCommands + from .adc import ADCController + +logger = logging.getLogger(__name__) + + +def _parse_int(s: str) -> int: + """Parse an integer from string, supporting both decimal and hex (0x prefix).""" + s = s.strip() + if 'x' in s or 'X' in s: + return int(s, 16) + return int(s) + + +def apply_parameter_file( + filepath: str, + commands: 'DetectorCommands', + adc: 'ADCController', + registers: 'RegisterInterface', + restart_daq_callback=None, +) -> tuple[bool, str]: + """ + Parse and apply all settings from a parameter file. + + The file format is one setting per line: + keyword value1 [value2 ...] + Lines starting with '--' are comments. + + Supported keywords: + ADC_clock_frequency_in_MHz + ADC_data_delay + chip_type (ignored — only team1k is supported) + exposure_mode + trigger_mode + integration_time + digital_signal_polarity + digital_signal_order_7downto0 + digital_signal_order_15downto8 + ADC_order_7downto0 + ADC_order_15downto8 + restart_daq + + Args: + filepath: Path to the parameter file. + commands: DetectorCommands instance. + adc: ADCController instance. + registers: RegisterInterface instance. + restart_daq_callback: Optional callable to restart DAQ (for 'restart_daq' keyword). + + Returns: + Tuple of (success, output_message). + """ + path = Path(filepath) + if not path.is_file(): + msg = f"Parameter file not found: {filepath}" + logger.error(msg) + return False, msg + + output = [f"Reading parameter file: {filepath}"] + logger.info("Reading parameter file: %s", filepath) + + with open(path, 'r') as f: + for line_num, raw_line in enumerate(f, start=1): + line = raw_line.strip() + if not line: + continue + if line.startswith('--'): + continue + + parts = line.split() + if not parts: + continue + + keyword = parts[0] + args = parts[1:] + + ok = False + msg = "" + + try: + if keyword == "ADC_clock_frequency_in_MHz": + freq = float(args[0]) + ok = adc.set_clock_freq(freq) + msg = f"Set ADC clock frequency to {freq} MHz" + + elif keyword == "ADC_data_delay": + delay = _parse_int(args[0]) + ok = commands.set_adc_data_delay(delay) + msg = f"Set ADC data delay to 0x{delay:x}" + + elif keyword == "chip_type": + if args[0] == "team1k": + ok = True + msg = "Chip type: team1k (confirmed)" + else: + msg = f"Unsupported chip type: {args[0]} (only team1k supported)" + ok = False + + elif keyword == "exposure_mode": + mode = _parse_int(args[0]) + ok = commands.set_exposure_mode(mode) + msg = f"Set exposure mode to {mode}" + + elif keyword == "trigger_mode": + external = bool(_parse_int(args[0])) + polarity = bool(_parse_int(args[1])) if len(args) > 1 else False + ok = commands.set_trigger_mode(external, polarity) + msg = f"Set trigger mode: external={external}, polarity={polarity}" + + elif keyword == "integration_time": + time_ms = float(args[0]) + ok = commands.set_integration_time(time_ms) + msg = f"Set integration time to {time_ms} ms" + + elif keyword == "digital_signal_polarity": + val = _parse_int(args[0]) + ok = registers.write_register(27, val) + msg = f"Set digital signal polarity to {args[0]}" + + elif keyword == "digital_signal_order_7downto0": + val = _parse_int(args[0]) + ok = registers.write_register(28, val) + msg = f"Set digital signals 7:0 order to {args[0]}" + + elif keyword == "digital_signal_order_15downto8": + val = _parse_int(args[0]) + ok = registers.write_register(29, val) + msg = f"Set digital signals 15:8 order to {args[0]}" + + elif keyword == "ADC_order_7downto0": + val = _parse_int(args[0]) + ok = registers.write_register(30, val) + msg = f"Set ADC 7:0 order to {args[0]}" + + elif keyword == "ADC_order_15downto8": + val = _parse_int(args[0]) + ok = registers.write_register(31, val) + msg = f"Set ADC 15:8 order to {args[0]}" + + elif keyword == "restart_daq": + if restart_daq_callback: + ok = restart_daq_callback() + else: + ok = True + msg = "Restarted DAQ" + + else: + msg = f"Unknown keyword: {keyword}" + ok = False + + except (IndexError, ValueError) as e: + msg = f"Error parsing line {line_num}: {line} ({e})" + ok = False + + if ok: + output.append(f" {msg}") + else: + error_msg = f" Line {line_num}: {msg or f'Error: {line}'}" + output.append(error_msg) + logger.error("Parameter file error at line %d: %s", line_num, msg or line) + result = "\n".join(output) + return False, result + + output.append(f"Done reading parameter file: {filepath}") + result = "\n".join(output) + logger.info("Parameter file applied successfully") + return True, result diff --git a/src/team1k/detector/registers.py b/src/team1k/detector/registers.py new file mode 100644 index 0000000..a3bec7c --- /dev/null +++ b/src/team1k/detector/registers.py @@ -0,0 +1,217 @@ +""" +Register read/write protocol over UDP. + +Ports the register operations from NetworkInterface.cxx (lines 83-158). +Communicates with the detector's register port (default 42000). +""" + +import struct +import logging +import threading + +from .udp_transport import UDPSocket, SECURITY_KEY + +logger = logging.getLogger(__name__) + +# Register protocol constants +_WRITE_BIT = 0x80000000 # Bit 31 set = write operation +_ADDR_ECHO_MASK = 0x70000000 # Detector ORs this into echoed addresses + + +class RegisterError(Exception): + """Error during register read/write.""" + pass + + +class RegisterInterface: + """ + Register read/write over UDP. + + Each transaction sends a packet with one or more register operations and + waits for the detector to echo the packet back with results filled in. + """ + + DEFAULT_PORT = 42000 + + def __init__(self, detector_ip: str, port: int = DEFAULT_PORT): + self._socket = UDPSocket(detector_ip, port, recv_buffer_size=4096) + self._lock = threading.Lock() + + def read_register(self, address: int) -> int: + """ + Read a single register. + + Args: + address: Register address. + + Returns: + Register value. + + Raises: + RegisterError: On communication failure. + """ + values = self.read_registers([address]) + return values[0] + + def write_register(self, address: int, value: int) -> bool: + """ + Write a single register. + + Args: + address: Register address. + value: Value to write. + + Returns: + True on success. + + Raises: + RegisterError: On communication failure. + """ + return self.write_registers([(address, value)]) + + def read_registers(self, addresses: list[int]) -> list[int]: + """ + Read multiple registers in one packet. + + Args: + addresses: List of register addresses to read. + + Returns: + List of register values (same order as addresses). + + Raises: + RegisterError: On communication failure. + """ + entries = [(addr, 0) for addr in addresses] + response = self._transact(entries, write=False) + + # Extract values from response + values = [] + for i in range(len(addresses)): + offset = 8 + i * 8 + 4 # skip header(8) + addr(4) per entry + val = struct.unpack_from(' bool: + """ + Write multiple registers in one packet. + + Args: + address_value_pairs: List of (address, value) tuples. + + Returns: + True on success. + + Raises: + RegisterError: On communication failure. + """ + self._transact(address_value_pairs, write=True) + return True + + def _transact(self, entries: list[tuple[int, int]], write: bool) -> bytes: + """ + Execute a register transaction (send packet, receive echo). + + Packet format: + [security_key(u32), reserved(u32), {addr(u32), value(u32)}...] + + For writes: addr has bit 31 set (0x80000000). + Minimum 2 register pairs per packet (padded if fewer). + """ + n = len(entries) + # Build request: pad to minimum 2 entries + padded = list(entries) + while len(padded) < 2: + padded.append((0, 0)) + + # Build packet + parts = [struct.pack(' None: + """ + Validate the echoed response against the sent request. + + Checks: + 1. Security key matches + 2. Reserved field matches + 3. For each register: address echoed with |0x70000000 + 4. For writes: value echoed back + """ + # Check security key and reserved field + req_key, req_reserved = struct.unpack_from(' None: + """Close the register interface.""" + self._socket.close() + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() diff --git a/src/team1k/detector/udp_transport.py b/src/team1k/detector/udp_transport.py new file mode 100644 index 0000000..0162e4c --- /dev/null +++ b/src/team1k/detector/udp_transport.py @@ -0,0 +1,128 @@ +""" +Low-level UDP socket operations for detector communication. + +Ports the socket management from NetworkInterface.cxx (lines 271-402). +""" + +import select +import socket +import struct +import logging + +logger = logging.getLogger(__name__) + +SECURITY_KEY = 0x203b2d29 # " ;-)" — security key and endianness marker + + +class UDPSocket: + """Manages a single UDP socket for communication with detector hardware.""" + + def __init__(self, dest_ip: str, dest_port: int, recv_buffer_size: int = 4096): + """ + Create a UDP socket bound to an OS-assigned local port. + + Args: + dest_ip: Destination IP address (detector). + dest_port: Destination UDP port. + recv_buffer_size: Kernel receive buffer size in bytes. + """ + self._dest_addr = (dest_ip, dest_port) + + self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + self._sock.bind(('', 0)) # INADDR_ANY, OS-assigned port + + # Set receive buffer size + try: + self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buffer_size) + except OSError as e: + logger.warning("Could not set SO_RCVBUF to %d: %s", recv_buffer_size, e) + + actual = self._sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) + logger.debug("Receive buffer size: requested=%d, actual=%d", recv_buffer_size, actual) + + self._local_port = self._sock.getsockname()[1] + logger.debug("UDP socket bound to local port %d -> %s:%d", + self._local_port, dest_ip, dest_port) + + @property + def local_port(self) -> int: + """The OS-assigned local port number.""" + return self._local_port + + @property + def fileno(self) -> int: + """File descriptor for select().""" + return self._sock.fileno() + + def send(self, data: bytes | bytearray | memoryview) -> int: + """ + Send data to the configured destination. + + Returns: + Number of bytes sent. + + Raises: + OSError: On send failure. + """ + nsent = self._sock.sendto(data, self._dest_addr) + if nsent != len(data): + raise OSError(f"Sent {nsent} of {len(data)} bytes") + return nsent + + def recv(self, max_size: int, timeout_sec: float = 0.12) -> bytes | None: + """ + Receive data with timeout. + + Args: + max_size: Maximum bytes to receive. + timeout_sec: Timeout in seconds (0.12 = 120ms, matching C++ default). + + Returns: + Received bytes, or None on timeout. + """ + ready, _, _ = select.select([self._sock], [], [], timeout_sec) + if not ready: + return None + data, _ = self._sock.recvfrom(max_size) + return data + + def recv_into(self, buffer: bytearray | memoryview, timeout_sec: float = 0.12) -> int: + """ + Receive data directly into a pre-allocated buffer (zero-copy). + + Args: + buffer: Buffer to receive into. + timeout_sec: Timeout in seconds. + + Returns: + Number of bytes received, 0 on timeout. + """ + ready, _, _ = select.select([self._sock], [], [], timeout_sec) + if not ready: + return 0 + return self._sock.recv_into(buffer) + + def clear_buffer(self) -> None: + """Drain all pending data from the socket (non-blocking).""" + logger.debug("Clearing socket buffer...") + self._sock.setblocking(False) + try: + while True: + try: + self._sock.recv(65536) + except BlockingIOError: + break + finally: + self._sock.setblocking(True) + + def close(self) -> None: + """Close the socket.""" + if self._sock: + self._sock.close() + self._sock = None + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() diff --git a/src/team1k/filewriter/__init__.py b/src/team1k/filewriter/__init__.py new file mode 100644 index 0000000..5360f82 --- /dev/null +++ b/src/team1k/filewriter/__init__.py @@ -0,0 +1 @@ +from .writer import FileWriter diff --git a/src/team1k/filewriter/writer.py b/src/team1k/filewriter/writer.py new file mode 100644 index 0000000..21296ea --- /dev/null +++ b/src/team1k/filewriter/writer.py @@ -0,0 +1,317 @@ +""" +File writing from shared memory ring buffer. + +Ports KTestFunctions::FileWritingThread from KTestFunctions.cxx (lines 1198-1241). +Supports both raw binary (backward compatible) and HDF5 formats. +""" + +import os +import struct +import shutil +import logging +import threading +from pathlib import Path + +import numpy as np + +from ..detector.chip_config import ChipConfig, TEAM1K_CONFIG + +logger = logging.getLogger(__name__) + +# Minimum free disk space in MB before stopping writes +_MIN_FREE_SPACE_MB = 256 + +# Scan number persistence file +_SCAN_NUMBER_FILE = "/usr/local/k_test/parameters/last_scan_number.dat" + + +class FileWriter: + """ + Reads frames from locking_shmring and writes to disk. + + Supports two formats: + - 'raw': Binary files matching the C++ k_test format (4-byte header + pixel data) + - 'hdf5': HDF5 files with chunked datasets and metadata + + Runs as a background thread in the main process. + """ + + def __init__(self, ring_name: str, chip_config: ChipConfig = TEAM1K_CONFIG): + self._ring_name = ring_name + self._config = chip_config + self._thread: threading.Thread | None = None + self._running = False + + # Configuration (can be changed while running) + self._enabled = threading.Event() + self._output_dir = "/data" + self._format = "raw" # or "hdf5" + self._scan_number = self._load_scan_number() + self._lock = threading.Lock() + + # Stats + self.frames_written: int = 0 + self.bytes_written: int = 0 + + def _load_scan_number(self) -> int: + """Load the last scan number from the persistent file.""" + try: + with open(_SCAN_NUMBER_FILE, 'rb') as f: + data = f.read(4) + if len(data) == 4: + return struct.unpack(' None: + """Save the current scan number to the persistent file.""" + try: + os.makedirs(os.path.dirname(_SCAN_NUMBER_FILE), exist_ok=True) + with open(_SCAN_NUMBER_FILE, 'wb') as f: + f.write(struct.pack(' int: + return self._scan_number + + @property + def output_dir(self) -> str: + return self._output_dir + + @property + def file_format(self) -> str: + return self._format + + @property + def is_enabled(self) -> bool: + return self._enabled.is_set() + + def enable(self, output_dir: str | None = None, file_format: str | None = None) -> None: + """ + Enable file writing. + + Args: + output_dir: Output directory (default: keep current). + file_format: File format, 'raw' or 'hdf5' (default: keep current). + """ + with self._lock: + if output_dir is not None: + self._output_dir = output_dir + if file_format is not None: + if file_format not in ('raw', 'hdf5'): + raise ValueError(f"Unsupported format: {file_format}") + self._format = file_format + + # Increment scan number + self._scan_number += 1 + self._save_scan_number() + + # Create scan directory + scan_dir = os.path.join(self._output_dir, + f"scan_{self._scan_number:010d}") + os.makedirs(scan_dir, exist_ok=True) + + self._enabled.set() + logger.info("File writing enabled: dir=%s, format=%s, scan=%d", + self._output_dir, self._format, self._scan_number) + + def disable(self) -> None: + """Disable file writing.""" + self._enabled.clear() + logger.info("File writing disabled") + + def start(self) -> None: + """Start the writer thread.""" + if self._thread and self._thread.is_alive(): + return + self._running = True + self._thread = threading.Thread(target=self._writer_loop, daemon=True, + name="team1k-filewriter") + self._thread.start() + logger.info("File writer thread started") + + def stop(self) -> None: + """Stop the writer thread.""" + self._running = False + self._enabled.clear() + if self._thread: + self._thread.join(timeout=5.0) + logger.info("File writer thread stopped") + + def _check_disk_space(self) -> bool: + """Check for sufficient free disk space.""" + try: + stat = shutil.disk_usage(self._output_dir) + free_mb = stat.free / (1024 * 1024) + if free_mb < _MIN_FREE_SPACE_MB: + logger.error("Disk almost full (%.0f MB free), stopping writes", free_mb) + return False + return True + except Exception as e: + logger.error("Could not check disk space: %s", e) + return False + + def _writer_loop(self) -> None: + """Main writer loop reading from shmring.""" + from locking_shmring import RingBufferReader + + reader = RingBufferReader(self._ring_name) + logger.info("File writer connected to ring buffer '%s'", self._ring_name) + + raw_fp = None + h5_file = None + h5_dataset = None + file_number = 0 + transfers_in_file = 0 + ntransfers_per_file = self._config.ntransfers_per_file + current_scan = 0 + + try: + for slot in reader.iter_frames(timeout_ms=200): + if not self._running: + slot.release() + break + + with slot: + if not self._enabled.is_set(): + # Close any open files + if raw_fp: + raw_fp.close() + raw_fp = None + if h5_file: + h5_file.close() + h5_file = None + h5_dataset = None + file_number = 0 + transfers_in_file = 0 + continue + + # Check if scan number changed (new enable call) + with self._lock: + scan = self._scan_number + fmt = self._format + out_dir = self._output_dir + + if scan != current_scan: + # Close old files + if raw_fp: + raw_fp.close() + raw_fp = None + if h5_file: + h5_file.close() + h5_file = None + h5_dataset = None + file_number = 0 + transfers_in_file = 0 + current_scan = scan + + # Check disk space periodically + if transfers_in_file == 0 and not self._check_disk_space(): + self._enabled.clear() + continue + + frame = slot.data_as_numpy( + (self._config.image_ny, self._config.image_nx), np.uint16 + ) + + if fmt == "raw": + raw_fp, file_number, transfers_in_file = self._write_raw( + frame, slot.frame_id, raw_fp, out_dir, scan, + file_number, transfers_in_file, ntransfers_per_file + ) + elif fmt == "hdf5": + h5_file, h5_dataset = self._write_hdf5( + frame, slot.frame_id, slot.timestamp_ns, + h5_file, h5_dataset, out_dir, scan, file_number + ) + + self.frames_written += 1 + self.bytes_written += self._config.image_size_bytes + + except Exception as e: + logger.error("File writer error: %s", e) + finally: + if raw_fp: + raw_fp.close() + if h5_file: + h5_file.close() + reader.close() + + def _write_raw(self, frame: np.ndarray, frame_id: int, + fp, out_dir: str, scan: int, + file_number: int, transfers_in_file: int, + ntransfers_per_file: int): + """Write a frame in raw binary format (backward compatible with C++ k_test).""" + if transfers_in_file == 0: + if fp: + fp.close() + filename = os.path.join( + out_dir, + f"scan_{scan:010d}", + f"team1k_ued_{scan:010d}_{file_number:010d}.dat" + ) + fp = open(filename, 'wb') + # Write first image number header + fp.write(struct.pack('= ntransfers_per_file: + transfers_in_file = 0 + file_number += 1 + + return fp, file_number, transfers_in_file + + def _write_hdf5(self, frame: np.ndarray, frame_id: int, timestamp_ns: int, + h5_file, h5_dataset, out_dir: str, scan: int, file_number: int): + """Write a frame in HDF5 format.""" + try: + import h5py + except ImportError: + logger.error("h5py not installed, cannot write HDF5 files") + self._enabled.clear() + return None, None + + if h5_file is None: + filename = os.path.join( + out_dir, + f"scan_{scan:010d}", + f"team1k_ued_{scan:010d}_{file_number:010d}.h5" + ) + h5_file = h5py.File(filename, 'w') + h5_dataset = h5_file.create_dataset( + 'data', + shape=(0, self._config.image_ny, self._config.image_nx), + maxshape=(None, self._config.image_ny, self._config.image_nx), + dtype=np.uint16, + chunks=(1, self._config.image_ny, self._config.image_nx), + ) + h5_file.create_dataset('timestamps', shape=(0,), maxshape=(None,), + dtype=np.float64) + h5_file.create_dataset('frame_ids', shape=(0,), maxshape=(None,), + dtype=np.uint64) + h5_file.attrs['scan_number'] = scan + + # Append frame + n = h5_dataset.shape[0] + h5_dataset.resize(n + 1, axis=0) + h5_dataset[n] = frame + + ts_ds = h5_file['timestamps'] + ts_ds.resize(n + 1, axis=0) + ts_ds[n] = timestamp_ns / 1e9 + + fid_ds = h5_file['frame_ids'] + fid_ds.resize(n + 1, axis=0) + fid_ds[n] = frame_id + + h5_file.flush() + + return h5_file, h5_dataset diff --git a/src/team1k/peripherals/__init__.py b/src/team1k/peripherals/__init__.py new file mode 100644 index 0000000..5aef3ee --- /dev/null +++ b/src/team1k/peripherals/__init__.py @@ -0,0 +1,2 @@ +from .bellow_stage import BellowStage +from .power_supply import PowerSupply diff --git a/src/team1k/peripherals/bellow_stage.py b/src/team1k/peripherals/bellow_stage.py new file mode 100644 index 0000000..616483d --- /dev/null +++ b/src/team1k/peripherals/bellow_stage.py @@ -0,0 +1,92 @@ +""" +Camera bellow stage control via serial port. + +Extracted from the existing server.py. +""" + +import time +import logging + +import serial + +logger = logging.getLogger(__name__) + +DEFAULT_PORT = '/dev/CameraBellowStage' +DEFAULT_BAUDRATE = 9600 + + +class BellowStage: + """Controls the camera bellow stage motor via serial port.""" + + def __init__(self, port: str = DEFAULT_PORT, baudrate: int = DEFAULT_BAUDRATE): + self._port = port + self._baudrate = baudrate + self.position: int = 0 + self.is_moving: bool = False + + def move(self, insert: bool) -> tuple[bool, str | None]: + """ + Move the bellow stage. + + Args: + insert: True to insert, False to retract. + + Returns: + Tuple of (success, error_message). + """ + action = "Inserting" if insert else "Retracting" + logger.info("%s detector", action) + self.is_moving = True + + try: + with serial.Serial(self._port, self._baudrate, timeout=1) as ser: + time.sleep(0.5) + ser.write(b'\r') + time.sleep(0.5) + ser.write(b'\x03') # Ctrl-C to stop any ongoing movement + time.sleep(0.5) + + ser.write(b'rc=100\r') + time.sleep(0.5) + + if insert: + ser.write(b'vm=100000\r') + time.sleep(0.5) + ser.write(b'mr 2000000\r') + else: + ser.write(b'vm=100001\r') + time.sleep(0.5) + ser.write(b'mr -2000000\r') + + # Wait for movement + time.sleep(10) + + # Read position + ser.write(b'pr p\r') + time.sleep(0.5) + response_bytes = ser.read_all() + response = response_bytes.decode('utf-8', errors='ignore') if response_bytes else "" + + for line in response.splitlines(): + if line.strip().startswith('p='): + try: + self.position = int(line.strip()[2:]) + except ValueError: + pass + break + + self.is_moving = False + logger.info("Detector %s successfully (position=%d)", + "inserted" if insert else "retracted", self.position) + return True, None + + except serial.SerialException as e: + self.is_moving = False + error = f"Serial port error: {e}" + logger.error(error) + return False, error + except Exception as e: + self.is_moving = False + error = f"Bellow stage error: {e}" + logger.error(error) + return False, error diff --git a/src/team1k/peripherals/power_supply.py b/src/team1k/peripherals/power_supply.py new file mode 100644 index 0000000..7351427 --- /dev/null +++ b/src/team1k/peripherals/power_supply.py @@ -0,0 +1,37 @@ +""" +Power supply control. + +Extracted from the existing server.py. +""" + +import logging + +logger = logging.getLogger(__name__) + + +class PowerSupply: + """Controls the detector power supply.""" + + def __init__(self): + self.is_on: bool = False + self.voltages: list[float] = [] + + def turn_on(self) -> tuple[bool, str | None]: + """Turn on the power supply.""" + # TODO: Implement actual power supply control + # The original server.py had placeholder serial port communication + self.is_on = True + logger.info("Power supply turned on") + return True, None + + def turn_off(self) -> tuple[bool, str | None]: + """Turn off the power supply.""" + self.is_on = False + logger.info("Power supply turned off") + return True, None + + def read_voltages(self) -> tuple[list[float] | None, str | None]: + """Read voltage values from the power supply.""" + # TODO: Implement actual voltage reading + logger.info("Reading power supply voltages") + return self.voltages, None diff --git a/src/team1k/pva/__init__.py b/src/team1k/pva/__init__.py new file mode 100644 index 0000000..11a5590 --- /dev/null +++ b/src/team1k/pva/__init__.py @@ -0,0 +1,2 @@ +from .interface import PVAInterface +from .streamer import PVAStreamer diff --git a/src/team1k/pva/interface.py b/src/team1k/pva/interface.py new file mode 100644 index 0000000..de6deca --- /dev/null +++ b/src/team1k/pva/interface.py @@ -0,0 +1,220 @@ +""" +PVA server setup, command PVs, and status PVs. + +Replaces all ZMQ communication (REQ/REP for commands, PUB for status) +with EPICS PV Access channels. +""" + +import logging +import threading +from typing import TYPE_CHECKING, Any + +import numpy as np + +from p4p import Value +from p4p.server import Server, ServerOperation +from p4p.server.thread import SharedPV +from p4p.nt import NTScalar, NTNDArray + +if TYPE_CHECKING: + from ..server import Team1kServer + +logger = logging.getLogger(__name__) + + +class _ReadOnlyHandler: + """Handler for read-only status PVs.""" + + def put(self, pv: SharedPV, op: ServerOperation): + op.done(error="PV is read-only") + + def rpc(self, pv: SharedPV, op: ServerOperation): + op.done(error="RPC not supported") + + +class _CommandHandler: + """ + Handler for writable command PVs. + + When a client puts a value, the handler dispatches to the server. + """ + + def __init__(self, server: 'Team1kServer', command_name: str): + self._server = server + self._command = command_name + + def put(self, pv: SharedPV, op: ServerOperation): + try: + raw = op.value() + # Extract the actual value from the NTScalar wrapper + value = raw.value if hasattr(raw, 'value') else raw + result = self._server.execute_command(self._command, value) + pv.post(raw) + op.done() + except Exception as e: + logger.error("Command '%s' failed: %s", self._command, e) + op.done(error=str(e)) + + +class PVAInterface: + """ + EPICS PV Access server for commands, status, and data streaming. + + PV layout: + Status (read-only): + {prefix}STATUS - string array of status lines + {prefix}ACQUIRING - bool, DAQ running + {prefix}FRAME_RATE - float, current frame rate Hz + {prefix}FRAME_COUNT - int, total frames acquired + {prefix}EXPOSURE_MODE - int, current exposure mode + {prefix}TRIGGER_MODE - int, current trigger mode + {prefix}INTEGRATION_TIME - float, integration time ms + {prefix}FILE_WRITING - bool, file writing enabled + {prefix}OUTPUT_DIR - string, output directory + {prefix}SCAN_NUMBER - int, current scan number + + Commands (writable): + {prefix}CMD:EXPOSURE_MODE - int, set exposure mode + {prefix}CMD:TRIGGER_MODE - int, set trigger mode + {prefix}CMD:INTEGRATION_TIME - float, set integration time ms + {prefix}CMD:START_DAQ - int, 1=start, 0=stop + {prefix}CMD:FILE_WRITING - int, 1=enable, 0=disable + {prefix}CMD:OUTPUT_DIR - string, set output directory + {prefix}CMD:FILE_FORMAT - string, "raw" or "hdf5" + {prefix}CMD:ADC_CLOCK_FREQ - float, set ADC clock MHz + {prefix}CMD:ADC_DATA_DELAY - int, set ADC data delay + {prefix}CMD:PARAMETER_FILE - string, apply parameter file + {prefix}CMD:RESET - int, 1=reset detector connection + {prefix}CMD:TEST_MODE - int, 0/1 + {prefix}CMD:REGISTER_READ - int, read register at address + {prefix}CMD:REGISTER_WRITE - int array [address, value] + + Data: + {prefix}IMAGE - NTNDArray, live image stream + + Peripherals: + {prefix}BELLOW:CMD - int, 1=insert, 0=retract + {prefix}BELLOW:POSITION - int, current position + {prefix}POWER:CMD - int, 1=on, 0=off + {prefix}POWER:VOLTAGES - float array, voltage readings + """ + + def __init__(self, server: 'Team1kServer', prefix: str = "TEAM1K:"): + self._server = server + self._prefix = prefix + self._pvs: dict[str, SharedPV] = {} + self._pva_server: Server | None = None + self._ro_handler = _ReadOnlyHandler() + + @property + def image_pv(self) -> SharedPV: + return self._pvs[f"{self._prefix}IMAGE"] + + def _make_status_pv(self, nt, initial) -> SharedPV: + """Create a read-only status PV.""" + pv = SharedPV(handler=self._ro_handler, nt=nt, initial=initial) + return pv + + def _make_command_pv(self, nt, initial, command_name: str) -> SharedPV: + """Create a writable command PV.""" + handler = _CommandHandler(self._server, command_name) + pv = SharedPV(handler=handler, nt=nt, initial=initial) + return pv + + def setup(self) -> None: + """Create all PVs and start the PVA server.""" + p = self._prefix + + # --- Status PVs (read-only) --- + self._pvs[f"{p}STATUS"] = self._make_status_pv( + NTScalar('as'), ['INITIALIZING']) + self._pvs[f"{p}ACQUIRING"] = self._make_status_pv( + NTScalar('?'), False) + self._pvs[f"{p}FRAME_RATE"] = self._make_status_pv( + NTScalar('d'), 0.0) + self._pvs[f"{p}FRAME_COUNT"] = self._make_status_pv( + NTScalar('l'), 0) + self._pvs[f"{p}EXPOSURE_MODE"] = self._make_status_pv( + NTScalar('i'), 0) + self._pvs[f"{p}TRIGGER_MODE"] = self._make_status_pv( + NTScalar('i'), 0) + self._pvs[f"{p}INTEGRATION_TIME"] = self._make_status_pv( + NTScalar('d'), 0.0) + self._pvs[f"{p}FILE_WRITING"] = self._make_status_pv( + NTScalar('?'), False) + self._pvs[f"{p}OUTPUT_DIR"] = self._make_status_pv( + NTScalar('s'), "/data") + self._pvs[f"{p}SCAN_NUMBER"] = self._make_status_pv( + NTScalar('l'), 0) + + # --- Command PVs (writable) --- + self._pvs[f"{p}CMD:EXPOSURE_MODE"] = self._make_command_pv( + NTScalar('i'), 0, "set_exposure_mode") + self._pvs[f"{p}CMD:TRIGGER_MODE"] = self._make_command_pv( + NTScalar('i'), 0, "set_trigger_mode") + self._pvs[f"{p}CMD:INTEGRATION_TIME"] = self._make_command_pv( + NTScalar('d'), 0.0, "set_integration_time") + self._pvs[f"{p}CMD:START_DAQ"] = self._make_command_pv( + NTScalar('i'), 0, "start_stop_daq") + self._pvs[f"{p}CMD:FILE_WRITING"] = self._make_command_pv( + NTScalar('i'), 0, "set_file_writing") + self._pvs[f"{p}CMD:OUTPUT_DIR"] = self._make_command_pv( + NTScalar('s'), "", "set_output_dir") + self._pvs[f"{p}CMD:FILE_FORMAT"] = self._make_command_pv( + NTScalar('s'), "raw", "set_file_format") + self._pvs[f"{p}CMD:ADC_CLOCK_FREQ"] = self._make_command_pv( + NTScalar('d'), 60.0, "set_adc_clock_freq") + self._pvs[f"{p}CMD:ADC_DATA_DELAY"] = self._make_command_pv( + NTScalar('i'), 0, "set_adc_data_delay") + self._pvs[f"{p}CMD:PARAMETER_FILE"] = self._make_command_pv( + NTScalar('s'), "", "apply_parameter_file") + self._pvs[f"{p}CMD:RESET"] = self._make_command_pv( + NTScalar('i'), 0, "reset_connection") + self._pvs[f"{p}CMD:TEST_MODE"] = self._make_command_pv( + NTScalar('i'), 0, "set_test_mode") + + # --- Data PV --- + self._pvs[f"{p}IMAGE"] = SharedPV( + handler=self._ro_handler, + nt=NTNDArray(), + initial=np.zeros((self._server.chip_config.image_ny, + self._server.chip_config.image_nx), dtype=np.uint16), + ) + + # --- Peripheral PVs --- + self._pvs[f"{p}BELLOW:CMD"] = self._make_command_pv( + NTScalar('i'), 0, "bellow_move") + self._pvs[f"{p}BELLOW:POSITION"] = self._make_status_pv( + NTScalar('i'), 0) + self._pvs[f"{p}POWER:CMD"] = self._make_command_pv( + NTScalar('i'), 0, "power_control") + self._pvs[f"{p}POWER:VOLTAGES"] = self._make_status_pv( + NTScalar('ad'), np.zeros(0, dtype=np.float64)) + + # Start the PVA server + self._pva_server = Server(providers=[self._pvs]) + logger.info("PVA server started with %d PVs (prefix=%s)", + len(self._pvs), self._prefix) + + def post_status(self, name: str, value: Any) -> None: + """ + Update a status PV. + + Args: + name: PV name suffix (without prefix), e.g., 'FRAME_RATE'. + value: New value to post. + """ + full_name = f"{self._prefix}{name}" + pv = self._pvs.get(full_name) + if pv: + try: + pv.post(value) + except Exception as e: + logger.debug("Could not post to %s: %s", full_name, e) + + def stop(self) -> None: + """Stop the PVA server.""" + if self._pva_server: + self._pva_server.stop() + self._pva_server = None + logger.info("PVA server stopped") diff --git a/src/team1k/pva/streamer.py b/src/team1k/pva/streamer.py new file mode 100644 index 0000000..cf7bd7e --- /dev/null +++ b/src/team1k/pva/streamer.py @@ -0,0 +1,130 @@ +""" +PVA data streamer — reads frames from shmring and posts to NTNDArray PV. + +Runs as a background thread in the main process. +""" + +import time +import logging +import threading + +import numpy as np + +from p4p import Value +from p4p.server.thread import SharedPV +from p4p.nt import NTNDArray + +from ..detector.chip_config import ChipConfig, TEAM1K_CONFIG + +logger = logging.getLogger(__name__) + + +class PVAStreamer: + """ + Reads frames from locking_shmring and posts to a PVA NTNDArray channel. + + The streamer naturally drops frames if the PVA channel is slower than + acquisition — the ring buffer reader simply skips ahead and reports + frames_skipped. + """ + + def __init__(self, ring_name: str, image_pv: SharedPV, + chip_config: ChipConfig = TEAM1K_CONFIG): + self._ring_name = ring_name + self._image_pv = image_pv + self._config = chip_config + self._thread: threading.Thread | None = None + self._running = False + + # Stats + self.frame_count: int = 0 + self.frames_skipped: int = 0 + self.frame_rate: float = 0.0 + + # Callbacks for updating status PVs + self._frame_count_callback = None + self._frame_rate_callback = None + + def set_callbacks(self, frame_count_cb=None, frame_rate_cb=None): + """Set callbacks for updating external status PVs.""" + self._frame_count_callback = frame_count_cb + self._frame_rate_callback = frame_rate_cb + + def start(self) -> None: + """Start the streamer thread.""" + if self._thread and self._thread.is_alive(): + return + self._running = True + self._thread = threading.Thread(target=self._streamer_loop, daemon=True, + name="team1k-pva-streamer") + self._thread.start() + logger.info("PVA streamer thread started") + + def stop(self) -> None: + """Stop the streamer thread.""" + self._running = False + if self._thread: + self._thread.join(timeout=5.0) + logger.info("PVA streamer thread stopped") + + def _streamer_loop(self) -> None: + """Main loop reading from shmring and posting to PVA.""" + from locking_shmring import RingBufferReader + + reader = RingBufferReader(self._ring_name) + logger.info("PVA streamer connected to ring buffer '%s'", self._ring_name) + + last_time = time.monotonic() + frames_since_last = 0 + nt = NTNDArray() + + try: + for slot in reader.iter_frames(timeout_ms=200): + if not self._running: + slot.release() + break + + with slot: + self.frame_count += 1 + self.frames_skipped += slot.frames_skipped + frames_since_last += 1 + + # Get numpy view of frame data + frame = slot.data_as_numpy( + (self._config.image_ny, self._config.image_nx), np.uint16 + ) + + # Build NTNDArray value and post + height, width = frame.shape + frame_array = { + 'value': frame.ravel(), + 'dimension': [ + {'size': height, 'offset': 0, 'fullSize': height, + 'binning': 1, 'reverse': False}, + {'size': width, 'offset': 0, 'fullSize': width, + 'binning': 1, 'reverse': False}, + ], + } + + try: + self._image_pv.post(Value(nt, frame_array)) + except Exception as e: + logger.debug("PVA post error (no clients?): %s", e) + + # Update frame rate every second + now = time.monotonic() + dt = now - last_time + if dt >= 1.0: + self.frame_rate = frames_since_last / dt + frames_since_last = 0 + last_time = now + + if self._frame_rate_callback: + self._frame_rate_callback(self.frame_rate) + if self._frame_count_callback: + self._frame_count_callback(self.frame_count) + + except Exception as e: + logger.error("PVA streamer error: %s", e) + finally: + reader.close() diff --git a/src/team1k/server.py b/src/team1k/server.py index 3587402..eac9379 100644 --- a/src/team1k/server.py +++ b/src/team1k/server.py @@ -1,1196 +1,432 @@ #!/usr/bin/env python3 """ -Team1k Server - Controls the Team1k X-ray detector system. +Team1k Server — Controls the Team1k X-ray detector system. -This server manages: -1. A camera bellow stage (linear motion) -2. A power supply for the detector -3. The detector itself (via KTest commands) -4. Data streaming from the detector to EPICS PVA +Communicates directly with the detector via UDP (replacing k_test C++ program). +Uses a dedicated subprocess for high-throughput data acquisition, shared memory +ring buffer for data transfer, and EPICS PV Access for all client communication. -It provides both command-response (REP) and publish-subscribe (PUB) interfaces. +Architecture: + Main process: + - Detector register read/write (UDP port 42000) + - PVA server (commands, status, data) + - PVA streamer thread (shmring -> NTNDArray) + - File writer thread (shmring -> raw/HDF5 files) + - Peripheral control (bellow stage, power supply) + + Acquisition subprocess: + - UDP data reception (port 41000) + - Frame assembly from packets + - Writes to locking_shmring """ -import os import sys -import enum import time -import queue import signal -import socket -import asyncio import logging -import pickle -import importlib -import dataclasses +import argparse import threading -from pathlib import Path -from typing import Dict, List, Any, Optional, Tuple, Union, Callable, Awaitable, TypeVar, Generic -import numpy as np +from typing import Any -import zmq -import zmq.asyncio -import serial -import psutil -import subprocess +from .detector.registers import RegisterInterface +from .detector.commands import DetectorCommands +from .detector.adc import ADCController +from .detector.chip_config import TEAM1K_CONFIG, configure_chip +from .detector.parameter_file import apply_parameter_file -from p4p import Value, Type -from p4p.server import Server, ServerOperation -from p4p.server.thread import SharedPV -from p4p.nt import NTScalar, NTNDArray +from .acquisition.receiver import AcquisitionProcess +from .filewriter.writer import FileWriter +from .pva.interface import PVAInterface +from .pva.streamer import PVAStreamer +from .peripherals.bellow_stage import BellowStage +from .peripherals.power_supply import PowerSupply -from .KTestClient import KTestClient, KTestCommand, KTestError, KTEST_COMMAND_ARGS, ExposureModes, TriggerModes +logger = logging.getLogger(__name__) -# Configuration paths -SW_PATH = "/home/daq-user/Team1k_ANL_UED_Camera/sw" -K_TEST_CONFIG_PATH = "/home/daq-user/Team1k_ANL_UED_Camera/configuration/parameterfile_Team1k_UED_ANL.txt" +# Default configuration +DEFAULT_DETECTOR_IP = "10.0.0.32" +DEFAULT_REGISTER_PORT = 42000 +DEFAULT_DATA_PORT = 41000 +DEFAULT_PV_PREFIX = "TEAM1K:" +DEFAULT_BELLOW_PORT = "/dev/CameraBellowStage" -# Serial port for the camera bellow stage -BELLOW_STAGE_PORT = '/dev/CameraBellowStage' -BELLOW_STAGE_BAUDRATE = 9600 - -# ZMQ ports -CMD_PORT = 42003 # Command-response socket -PUB_PORT = 42004 # Status publication socket -DATA_PORT = 42005 # Data socket for NetworkInterface.cxx - -# Status update interval in seconds -STATUS_UPDATE_INTERVAL = 1.0 - -# Configure logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' -) - -#--------------------------------------------------------------------------- -# Commands and status enums -#--------------------------------------------------------------------------- - -class Commands(enum.Enum): - """Commands that can be sent to the Team1kServer.""" - - # Device control commands - INSERT = "insert" - """Insert the detector into the beam.""" - - RETRACT = "retract" - """Retract the detector from the beam.""" - - POWER_SUPPLY_ON = "power_on" - """Turn the detector power supply on.""" - - POWER_SUPPLY_OFF = "power_off" - """Turn the detector power supply off.""" - - READ_VOLTAGES = "read_voltages" - """Read the voltage values from the power supply.""" - - # Server control commands - KILL = "kill" - """Kill this server.""" - - GET_STATUS = "get_status" - """Get the current status of all devices.""" - - # Stream processor commands - ADD_STREAM = "add_stream" - """Add a new data stream processor.""" - - REMOVE_STREAM = "remove_stream" - """Remove a data stream processor.""" - - LIST_STREAMS = "list_streams" - """List all active stream processors.""" - - -class DeviceType(enum.Enum): - """Types of devices managed by the server.""" - BELLOW_STAGE = "bellow_stage" - POWER_SUPPLY = "power_supply" - DETECTOR = "detector" - SERVER = "server" - - -class DeviceStatus(enum.Enum): - """Status of a device.""" - UNKNOWN = "unknown" - READY = "ready" - BUSY = "busy" - ERROR = "error" - DISCONNECTED = "disconnected" - - -@dataclasses.dataclass -class BellowStageState: - """State of the camera bellow stage.""" - status: DeviceStatus = DeviceStatus.UNKNOWN - position: int = 0 - is_inserted: bool = False - is_moving: bool = False - error_message: Optional[str] = None - - -@dataclasses.dataclass -class PowerSupplyState: - """State of the power supply.""" - status: DeviceStatus = DeviceStatus.UNKNOWN - is_on: bool = False - voltage_values: List[float] = dataclasses.field(default_factory=lambda: [0.0, 0.0, 0.0]) - error_message: Optional[str] = None - - -@dataclasses.dataclass -class DetectorState: - """State of the detector.""" - status: DeviceStatus = DeviceStatus.UNKNOWN - exposure_mode: Optional[ExposureModes] = None - trigger_mode: Optional[TriggerModes] = None - integration_time: float = 0.0 - is_acquiring: bool = False - error_message: Optional[str] = None - - -@dataclasses.dataclass -class SystemState: - """Overall state of the Team1k system.""" - bellow_stage: BellowStageState = dataclasses.field(default_factory=BellowStageState) - power_supply: PowerSupplyState = dataclasses.field(default_factory=PowerSupplyState) - detector: DetectorState = dataclasses.field(default_factory=DetectorState) - server_uptime: float = 0.0 - start_time: float = dataclasses.field(default_factory=time.time) - - @property - def as_dict(self) -> Dict[str, Any]: - """Convert the state to a dictionary.""" - return { - 'bellow_stage': dataclasses.asdict(self.bellow_stage), - 'power_supply': dataclasses.asdict(self.power_supply), - 'detector': dataclasses.asdict(self.detector), - 'server_uptime': time.time() - self.start_time - } - - -#--------------------------------------------------------------------------- -# Utility functions -#--------------------------------------------------------------------------- - -def parse_command(command_str: str) -> Tuple[Union[Commands, KTestCommand], List[Any]]: - """ - Parse a command string into a command enum and arguments. - - Args: - command_str: The command string to parse - - Returns: - A tuple of (command, args) where command is a Commands or KTestCommand enum - and args is a list of arguments - """ - parts = command_str.strip().lower().split() - if not parts: - raise ValueError("Empty command") - - cmd = parts[0] - args = parts[1:] - - # Try to match with server commands first - try: - return Commands(cmd), args - except ValueError: - pass - - # Try to match with KTest commands - try: - return KTestCommand(cmd.upper()), args - except ValueError: - raise ValueError(f"Unknown command: {cmd}") - - -def load_processor_function(processor_info: str) -> Callable: - """ - Dynamically load a processor function from a module. - - Args: - processor_info: String in format "module_name:function_name" - - Returns: - The loaded function - - Raises: - ValueError: If the processor_info is invalid or the function cannot be loaded - """ - module_name = None - func_name = None - - try: - parts = processor_info.split(':', 1) - if len(parts) != 2: - raise ValueError(f"Invalid processor format. Expected 'module:function', got '{processor_info}'") - - module_name, func_name = parts - module = importlib.import_module(module_name) - function = getattr(module, func_name) - - # Validate it's callable - if not callable(function): - raise ValueError(f"{func_name} is not callable") - - return function - except ImportError as e: - raise ValueError(f"Could not import module '{module_name}': {e}") - except AttributeError: - raise ValueError(f"Function '{func_name}' not found in module '{module_name}'") - except Exception as e: - raise ValueError(f"Error loading processor function: {e}") - -def parse_args(command: Union[Commands, KTestCommand], args: List[str]) -> List[Any]: - """ - Parse and validate command arguments. - - Args: - command: The command enum - args: The string arguments to parse - - Returns: - A list of parsed arguments - """ - if isinstance(command, Commands): - # Server commands with no arguments - if command in (Commands.INSERT, Commands.RETRACT, Commands.POWER_SUPPLY_ON, - Commands.POWER_SUPPLY_OFF, Commands.READ_VOLTAGES, Commands.KILL, - Commands.GET_STATUS, Commands.LIST_STREAMS): - if args: - raise ValueError(f"Command {command.value} takes no arguments") - return [] - - # Stream processor commands - elif command == Commands.ADD_STREAM: - if len(args) < 3: - raise ValueError(f"Command {command.value} requires at least port, pva_name, and dimensions") - - # Parse port - try: - port = int(args[0]) - except ValueError: - raise ValueError(f"Invalid port number: {args[0]}") - - # PVA name - pva_name = args[1] - - # Dimensions - comma-separated list of integers - try: - dimensions = tuple(int(d) for d in args[2].split(',')) - if len(dimensions) != 2: - raise ValueError(f"Dimensions must be two integers (width,height)") - except ValueError as e: - raise ValueError(f"Invalid dimensions: {e}") - - # Optional callback module and function - callback_info = None - if len(args) > 3: - callback_info = args[3] - - return [port, pva_name, dimensions, callback_info] - - elif command == Commands.REMOVE_STREAM: - if len(args) != 1: - raise ValueError(f"Command {command.value} requires a PVA name") - return [args[0]] - - else: - raise ValueError(f"Unknown server command: {command.value}") - else: - # KTest commands - expected_types = KTEST_COMMAND_ARGS.get(command, ()) - if len(args) != len(expected_types): - raise ValueError(f"Expected {len(expected_types)} arguments for {command.value}, got {len(args)}") - - parsed_args = [] - for i, (arg, arg_type) in enumerate(zip(args, expected_types)): - try: - if arg_type == bool: - # Handle bool conversion specially - if arg.lower() in ('true', '1', 'yes'): - parsed_args.append(True) - elif arg.lower() in ('false', '0', 'no'): - parsed_args.append(False) - else: - raise ValueError(f"Invalid boolean value: {arg}") - elif issubclass(arg_type, enum.Enum): - # Handle enum conversion - try: - parsed_args.append(arg_type(int(arg))) - except ValueError: - raise ValueError(f"Invalid enum value for {arg_type.__name__}: {arg}") - else: - parsed_args.append(arg_type(arg)) - except ValueError as e: - raise ValueError(f"Invalid argument {i+1} for {command.value}: {e}") - - return parsed_args - - -#--------------------------------------------------------------------------- -# Main server class -#--------------------------------------------------------------------------- class Team1kServer: """ - Server to control the Team1k detector system. - - Provides: - - ZMQ REP socket for command-response - - ZMQ PUB socket for status updates - - EPICS PV Access for data streaming + Main server coordinating all subsystems. + + Manages direct UDP communication with the detector, data acquisition + via a dedicated subprocess, PVA streaming, and optional file writing. """ - - def __init__(self, cmd_port=CMD_PORT, pub_port=PUB_PORT, data_port=DATA_PORT): - """ - Initialize the Team1k server. - - Args: - cmd_port: Port for the ZMQ REP socket (commands) - pub_port: Port for the ZMQ PUB socket (status updates) - data_port: Port for the ZMQ data socket (from NetworkInterface.cxx) - """ - self.cmd_port = cmd_port - self.pub_port = pub_port + + def __init__(self, detector_ip: str = DEFAULT_DETECTOR_IP, + register_port: int = DEFAULT_REGISTER_PORT, + data_port: int = DEFAULT_DATA_PORT, + pv_prefix: str = DEFAULT_PV_PREFIX, + config_file: str | None = None, + bellow_port: str = DEFAULT_BELLOW_PORT): + + self.detector_ip = detector_ip + self.register_port = register_port self.data_port = data_port - - # Setup logging - self.logger = logging.getLogger("Team1kServer") - - # Initialize ZMQ - self.context = zmq.asyncio.Context() - - # Command socket (REP) - self.cmd_socket = self.context.socket(zmq.REP) - self.cmd_socket.bind(f"tcp://*:{self.cmd_port}") - - # Status publication socket (PUB) - self.pub_socket = self.context.socket(zmq.PUB) - self.pub_socket.bind(f"tcp://*:{self.pub_port}") - - # Data socket (SUB) - connects to NetworkInterface.cxx - self.data_socket = self.context.socket(zmq.SUB) - self.data_socket.setsockopt_string(zmq.SUBSCRIBE, '') - self.data_socket.connect(f"tcp://localhost:{self.data_port}") - - # Data distribution - self.raw_data_queues = {} # {name: queue.Queue} - self.data_streams = {} # {pva_name: DataStream} - - # System state - self.state = SystemState() - - # Asyncio event loop and tasks - self._k_test_process = None - self.running = False - self._tasks = [] - - # EPICS PVA server - self.pva_server = None - self.pv_dict = {} # Dictionary of all active PVs - self.setup_pva_server() - - @dataclasses.dataclass - class DataStream: - """Configuration for a PVA data stream.""" - pva_name: str - data_type: str # 'NTNDArray' or 'ScalarArray' - config: Dict[str, Any] # Configuration for the data type - queue: queue.Queue - pv: Optional[SharedPV] = None - connected_clients: int = 0 - running: bool = False - task: Optional[asyncio.Task] = None - - def get_raw_data_queue(self, name: str, maxsize: int = 10) -> queue.Queue: - """ - Get a queue that will receive raw data frames. - - Args: - name: Unique name for this queue (used for identification) - maxsize: Maximum queue size (older items will be dropped when full) - - Returns: - A queue object that will receive raw data frames - """ - if name in self.raw_data_queues: - return self.raw_data_queues[name] - - q = queue.Queue(maxsize=maxsize) - self.raw_data_queues[name] = q - self.logger.info(f"Created raw data queue: {name}") - return q - - def add_data_stream(self, name: str, data_type: str, config: Dict[str, Any], data_queue: queue.Queue) -> None: - """ - Add a data stream that will publish data from the queue to a PVA channel. - - Args: - name: The PVA name for this stream - data_type: Type of data ('NTNDArray' or 'ScalarArray') - config: Configuration for the data type - data_queue: Queue that will provide data to be published - """ - if name in self.data_streams: - raise ValueError(f"Data stream '{name}' already exists") - - # Validate data type - if data_type not in ('NTNDArray', 'ScalarArray'): - raise ValueError(f"Unsupported data type: {data_type}") - - # Create the appropriate PV based on type - if data_type == 'NTNDArray': - # For image data - dimensions = config.get('dimensions', (1024, 1024)) - pv = SharedPV( - handler=PVAHandler(self, pva_name=name), - nt=NTNDArray(), - initial=np.full((1024, 1024), 0) - ) - elif data_type == 'ScalarArray': - # For array data - array_type = config.get('type', 'd') # Default to double - length = config.get('length', 1) - pv = SharedPV( - handler=PVAHandler(self, pva_name=name), - nt=NTScalar('a' + array_type), - initial=Value(NTScalar.buildType('a' + array_type), {'value': np.zeros(length)}) - ) - pv.open(np.zeros(length)) - - # Create DataStream object - data_stream = self.DataStream( - pva_name=name, - data_type=data_type, - config=config, - queue=data_queue, - pv=pv - ) - - # Add to our registry - self.data_streams[name] = data_stream - - # Update PV dictionary and recreate server - self.pv_dict[name] = pv - if self.pva_server: - self.pva_server.stop() - self.pva_server = Server([self.pv_dict]) - - self.logger.info(f"Added data stream: {name} ({data_type})") - - def setup_pva_server(self): - """Set up the EPICS PV Access server.""" - - # Create only the status PV initially - self.status_pv = SharedPV( - nt=NTScalar('as'), - value=['INITIALIZING'] - ) - self.status_pv.open(["INITIALIZING"]) - - # Initialize the PV dictionary with just the status PV - self.pv_dict = {'TEAM1K:STATUS': self.status_pv} - - # Create server with the PV dictionary - self.pva_server = Server(providers=[self.pv_dict]) - - # Create raw data queue for the default stream - raw_queue = self.get_raw_data_queue('default', maxsize=30) - - # Add default raw data stream for backward compatibility - self.add_data_stream( - 'TEAM1K:RAW', - 'NTNDArray', - {'dimensions': (1024, 1024)}, - raw_queue + self.chip_config = TEAM1K_CONFIG + self._config_file = config_file + self._shutdown_event = threading.Event() + self._acquiring = False + + # Detector communication (main process, register port) + self.registers = RegisterInterface(detector_ip, register_port) + self.commands = DetectorCommands(self.registers) + self.adc = ADCController(self.registers) + + # Acquisition subprocess + self.acquisition = AcquisitionProcess( + detector_ip, data_port, + ring_name="team1k_frames", + num_ring_slots=32, + chip_config=self.chip_config, ) - #----------------------------------------------------------------------- - # Device control methods - #----------------------------------------------------------------------- - - @property - def k_client(self) -> KTestClient: - """ - Get or create a KTestClient instance. - - Returns: - A KTestClient connected to the detector - """ - if not hasattr(self, '_k_client'): - # Check if k_test server is running on localhost - if not any("k_test" in p.info['name'] for p in psutil.process_iter(['name'])): - # Start k_test server if not running - self.logger.info("Starting k_test server on localhost") - k_test_cmd = ["k_test", K_TEST_CONFIG_PATH] - self.logger.debug(f"Running command: {' '.join(k_test_cmd)} in {SW_PATH}") - self._k_test_process = subprocess.Popen(k_test_cmd, cwd=SW_PATH) - time.sleep(1) # Give it time to start + # PVA interface + self.pva = PVAInterface(self, prefix=pv_prefix) - # Create the client - self._k_client = KTestClient("localhost") - - # Update detector status - self.state.detector.status = DeviceStatus.READY - - return self._k_client - - async def move_bellow_stage(self, insert: bool) -> Tuple[bool, Optional[str]]: + # File writer (thread in main process) + self.file_writer = FileWriter("team1k_frames", self.chip_config) + + # PVA streamer will be created after PVA setup + self._pva_streamer: PVAStreamer | None = None + + # Peripherals + self.bellow_stage = BellowStage(bellow_port) + self.power_supply = PowerSupply() + + def initialize(self) -> None: """ - Move the camera bellow stage in or out. - - Args: - insert: True to insert the detector, False to retract it - - Returns: - A tuple of (success, error_message) + Initialize the detector. + + Equivalent to the KTestFunctions constructor sequence: + 1. Read firmware version + 2. Configure chip type + 3. Set default exposure mode, trigger, integration time + 4. Set ADC clock frequency + 5. Set ADC data delay + 6. Optionally apply parameter file """ - action = "Inserting" if insert else "Retracting" - self.logger.info(f"{action} detector") - - # Update state - self.state.bellow_stage.is_moving = True - self.state.bellow_stage.status = DeviceStatus.BUSY - await self.publish_status() - + logger.info("Initializing detector at %s:%d", self.detector_ip, self.register_port) + + # Read firmware version try: - with serial.Serial(BELLOW_STAGE_PORT, BELLOW_STAGE_BAUDRATE, timeout=1) as ser: - # Initialize communication - await asyncio.sleep(0.5) - ser.write(b'\r') - await asyncio.sleep(0.5) - ser.write(b'\x03') # Ctrl-C to stop any ongoing movement - await asyncio.sleep(0.5) - - # Set speed - ser.write(b'rc=100\r') - await asyncio.sleep(0.5) - - # Move to position - if insert: - ser.write(b'vm=100000\r') # Max speed for insertion - await asyncio.sleep(0.5) - ser.write(b'mr 2000000\r') # Move relative - else: - ser.write(b'vm=100001\r') # Max speed for retraction - await asyncio.sleep(0.5) - ser.write(b'mr -2000000\r') # Move relative - - # Wait for movement to complete (could be improved with polling) - await asyncio.sleep(10) - - # Get final position - ser.write(b'pr p\r') # Print position - await asyncio.sleep(0.5) - response_bytes = ser.read_all() - response = response_bytes.decode('utf-8', errors='ignore') if response_bytes else "" - - # Parse position from response - try: - # Extract position value (assuming format like "p=12345") - for line in response.splitlines(): - if line.strip().startswith('p='): - position = int(line.strip()[2:]) - self.state.bellow_stage.position = position - break - except Exception as e: - self.logger.warning(f"Failed to parse position: {e}") - - # Update state - self.state.bellow_stage.is_inserted = insert - self.state.bellow_stage.is_moving = False - self.state.bellow_stage.status = DeviceStatus.READY - self.state.bellow_stage.error_message = None - - return True, None - - except serial.SerialTimeoutException as e: - error_msg = f"Serial timeout: {e}" - self.logger.error(error_msg) - self.state.bellow_stage.status = DeviceStatus.ERROR - self.state.bellow_stage.is_moving = False - self.state.bellow_stage.error_message = error_msg - return False, error_msg - - except serial.SerialException as e: - error_msg = f"Serial error: {e}" - self.logger.error(error_msg) - self.state.bellow_stage.status = DeviceStatus.ERROR - self.state.bellow_stage.is_moving = False - self.state.bellow_stage.error_message = error_msg - return False, error_msg - + version = self.commands.read_firmware_version() + logger.info("Firmware version: %d", version) except Exception as e: - error_msg = f"Unexpected error: {e}" - self.logger.error(error_msg) - self.state.bellow_stage.status = DeviceStatus.ERROR - self.state.bellow_stage.is_moving = False - self.state.bellow_stage.error_message = error_msg - return False, error_msg - - async def power_supply_control(self, turn_on: bool) -> Tuple[bool, Optional[str]]: + logger.warning("Could not read firmware version: %s", e) + + # Configure chip + configure_chip(self.registers, self.chip_config) + + # Set defaults (matching KTestFunctions constructor) + self.commands.set_exposure_mode(3) # Global shutter CDS + self.commands.set_trigger_mode(False) # Internal trigger + self.commands.set_integration_time(6.0) # 6 ms + self.adc.set_clock_freq(60.0) # 60 MHz + self.commands.set_adc_data_delay(0x1A7) + self.commands.set_adc_data_averaging(0) + self.commands.enable_fpga_test_data(False) + + # Apply parameter file if provided + if self._config_file: + ok, msg = apply_parameter_file( + self._config_file, self.commands, self.adc, self.registers, + restart_daq_callback=lambda: self._restart_daq() + ) + if not ok: + logger.error("Parameter file error:\n%s", msg) + else: + logger.info("Parameter file applied:\n%s", msg) + + logger.info("Detector initialized") + + def execute_command(self, command: str, value: Any) -> Any: """ - Turn the power supply on or off. - - Args: - turn_on: True to turn on, False to turn off - - Returns: - A tuple of (success, error_message) + Dispatch a command from a PVA put handler. + + This is called from p4p's internal thread, so it must be thread-safe. + Register operations are protected by the RegisterInterface's lock. """ - action = "Turning on" if turn_on else "Turning off" - self.logger.info(f"{action} power supply") - - # Update state - self.state.power_supply.status = DeviceStatus.BUSY - await self.publish_status() - - try: - # Placeholder for actual power supply control - # Replace with your actual implementation - await asyncio.sleep(1) - - # Update state with the result - self.state.power_supply.is_on = turn_on - self.state.power_supply.status = DeviceStatus.READY - self.state.power_supply.error_message = None - - return True, None - - except Exception as e: - error_msg = f"Power supply error: {e}" - self.logger.error(error_msg) - self.state.power_supply.status = DeviceStatus.ERROR - self.state.power_supply.error_message = error_msg - return False, error_msg - - async def read_power_supply_voltages(self) -> Tuple[List[float], Optional[str]]: - """ - Read the voltage values from the power supply. - - Returns: - A tuple of (voltage_values, error_message) - """ - self.logger.info("Reading power supply voltages") - - try: - # Placeholder for actual voltage reading - # Replace with your actual implementation - voltage_values = [3.3, 5.0, 12.0] # Example values - - # Update state - self.state.power_supply.voltage_values = voltage_values - self.state.power_supply.status = DeviceStatus.READY - self.state.power_supply.error_message = None - - return voltage_values, None - - except Exception as e: - error_msg = f"Error reading voltages: {e}" - self.logger.error(error_msg) - self.state.power_supply.status = DeviceStatus.ERROR - self.state.power_supply.error_message = error_msg - return [], error_msg - - async def execute_ktest_command(self, command: KTestCommand, args: List[Any]) -> Tuple[Any, Optional[str]]: - """ - Execute a KTest command on the detector. - - Args: - command: The KTest command to execute - args: The arguments for the command - - Returns: - A tuple of (result, error_message) - """ - self.logger.info(f"Executing KTest command: {command.value} {args}") - - try: - # Update detector state - self.state.detector.status = DeviceStatus.BUSY - await self.publish_status() - - # Execute command - result = self.k_client.send_command(command, *args) - - # Update state based on command - if command == KTestCommand.SET_EXPOSURE_MODE and args: - self.state.detector.exposure_mode = args[0] - elif command == KTestCommand.SET_TRIGGER_MODE and args: - self.state.detector.trigger_mode = args[0] - elif command == KTestCommand.SET_INTEGRATION_TIME and args: - self.state.detector.integration_time = args[0] - elif command == KTestCommand.RESTART_DAQ: - self.state.detector.is_acquiring = True - elif command == KTestCommand.STOP_DAQ: - self.state.detector.is_acquiring = False - - self.state.detector.status = DeviceStatus.READY - self.state.detector.error_message = None - - return result, None - - except KTestError as e: - error_msg = f"KTest error: {e}" - self.logger.error(error_msg) - self.state.detector.status = DeviceStatus.ERROR - self.state.detector.error_message = error_msg - return None, error_msg - - except Exception as e: - error_msg = f"Unexpected error: {e}" - self.logger.error(error_msg) - self.state.detector.status = DeviceStatus.ERROR - self.state.detector.error_message = error_msg - return None, error_msg - - #----------------------------------------------------------------------- - # Status publication - #----------------------------------------------------------------------- - - async def publish_status(self) -> None: - """Publish current status to PUB socket and EPICS PV.""" - # Update the state data - state_dict = self.state.as_dict - - # Create a status message with type indicator - status_message = { - 'type': 'status', - 'status': state_dict, - 'timestamp': time.time() - } - - # Publish to ZMQ PUB socket - await self.pub_socket.send_pyobj(status_message) - - # Update EPICS PV - if self.pva_server: - status_items = [] - for device, state in state_dict.items(): - if isinstance(state, dict) and 'status' in state: - status_items.append(f"{device}: {state['status']}") - self.status_pv.post(Value(self.status_pv.nt.type, {'value': status_items})) - - #----------------------------------------------------------------------- - # Command handling - #----------------------------------------------------------------------- - - async def handle_command(self, command_str: str) -> Any: - """ - Handle a command string. - - Args: - command_str: The command string to handle - - Returns: - The response from the command handler - - Raises: - ValueError: If the command is invalid or arguments are incorrect - Exception: For other errors - """ - # Parse the command - command, arg_strings = parse_command(command_str) - - # Parse arguments - args = parse_args(command, arg_strings) - - # Handle server commands - if isinstance(command, Commands): - return await self.handle_server_command(command, args) - - # Handle KTest commands - return await self.execute_ktest_command(command, args) - - async def handle_server_command(self, command: Commands, args: List[Any]) -> Any: - """ - Handle a server command. - - Args: - command: The command to handle - args: The parsed arguments - - Returns: - A response containing the command result - - Raises: - ValueError: For invalid commands - RuntimeError: For execution errors - """ - if command == Commands.INSERT: - success, error = await self.move_bellow_stage(insert=True) + logger.info("Executing command: %s = %s", command, value) + + if command == "set_exposure_mode": + self.commands.set_exposure_mode(int(value)) + self.pva.post_status("EXPOSURE_MODE", int(value)) + + elif command == "set_trigger_mode": + self.commands.set_trigger_mode(external=bool(int(value))) + self.pva.post_status("TRIGGER_MODE", int(value)) + + elif command == "set_integration_time": + self.commands.set_integration_time(float(value)) + self.pva.post_status("INTEGRATION_TIME", float(value)) + + elif command == "start_stop_daq": + if int(value): + self._start_daq() + else: + self._stop_daq() + + elif command == "set_file_writing": + if int(value): + self.file_writer.enable() + self.pva.post_status("FILE_WRITING", True) + self.pva.post_status("SCAN_NUMBER", self.file_writer.scan_number) + else: + self.file_writer.disable() + self.pva.post_status("FILE_WRITING", False) + + elif command == "set_output_dir": + self.file_writer._output_dir = str(value) + self.pva.post_status("OUTPUT_DIR", str(value)) + + elif command == "set_file_format": + fmt = str(value) + if fmt not in ("raw", "hdf5"): + raise ValueError(f"Unsupported format: {fmt}") + self.file_writer._format = fmt + + elif command == "set_adc_clock_freq": + self.adc.set_clock_freq(float(value)) + + elif command == "set_adc_data_delay": + self.commands.set_adc_data_delay(int(value)) + + elif command == "apply_parameter_file": + ok, msg = apply_parameter_file( + str(value), self.commands, self.adc, self.registers, + restart_daq_callback=lambda: self._restart_daq() + ) + if not ok: + raise RuntimeError(msg) + self.pva.post_status("STATUS", [msg]) + + elif command == "reset_connection": + if int(value): + self._stop_daq() + self.registers.close() + self.registers = RegisterInterface(self.detector_ip, self.register_port) + self.commands = DetectorCommands(self.registers) + self.adc = ADCController(self.registers) + self.initialize() + + elif command == "set_test_mode": + self.commands.enable_fpga_test_data(bool(int(value))) + + elif command == "bellow_move": + insert = bool(int(value)) + success, error = self.bellow_stage.move(insert) if not success: - raise RuntimeError(f"Failed to insert detector: {error}") - await self.publish_status() - return "Detector inserted successfully" - - elif command == Commands.RETRACT: - success, error = await self.move_bellow_stage(insert=False) + raise RuntimeError(error) + self.pva.post_status("BELLOW:POSITION", self.bellow_stage.position) + + elif command == "power_control": + if int(value): + success, error = self.power_supply.turn_on() + else: + success, error = self.power_supply.turn_off() if not success: - raise RuntimeError(f"Failed to retract detector: {error}") - await self.publish_status() - return "Detector retracted successfully" - - elif command == Commands.POWER_SUPPLY_ON: - success, error = await self.power_supply_control(turn_on=True) - if not success: - raise RuntimeError(f"Failed to turn on power supply: {error}") - await self.publish_status() - return "Power supply turned on successfully" - - elif command == Commands.POWER_SUPPLY_OFF: - success, error = await self.power_supply_control(turn_on=False) - if not success: - raise RuntimeError(f"Failed to turn off power supply: {error}") - await self.publish_status() - return "Power supply turned off successfully" - - elif command == Commands.READ_VOLTAGES: - voltages, error = await self.read_power_supply_voltages() - if not voltages and error: - raise RuntimeError(f"Failed to read voltages: {error}") - await self.publish_status() - return {"voltages": voltages} - - elif command == Commands.KILL: - asyncio.create_task(self.shutdown()) - await self.publish_status() - return "Server shutting down" - - elif command == Commands.GET_STATUS: - await self.publish_status() - return {"status": self.state.as_dict} - - elif command == Commands.LIST_STREAMS: - streams = [] - for name, stream in self.data_streams.items(): - streams.append({ - 'pva_name': name, - 'data_type': stream.data_type, - 'config': stream.config, - 'running': stream.running - }) - return {'streams': streams} - + raise RuntimeError(error) + else: - raise ValueError(f"Unknown command: {command.value}") - - # No need for a finally block since we publish_status in each branch - - #----------------------------------------------------------------------- - # Main server loop - #----------------------------------------------------------------------- - - async def process_data_stream(self) -> None: - """ - Process data from the NetworkInterface.cxx stream. - - This method receives raw data from the data socket and: - 1. Publishes it to the TEAM1K:RAW PVA channel - 2. Distributes it to all registered raw data queues - """ - self.logger.info("Data stream processor started") - - while self.running: + raise ValueError(f"Unknown command: {command}") + + return True + + def _start_daq(self) -> None: + """Start data acquisition.""" + if self._acquiring: + return + + # Start the acquisition subprocess if not running + if not self.acquisition.is_alive: + self.acquisition.start_process() + time.sleep(0.5) # Let subprocess initialize + + # Start data flow on the detector + self.commands.stop_data_flow() + time.sleep(0.01) + self.commands.start_data_flow() + + # Tell subprocess to start receiving + self.acquisition.start_acquisition() + + self._acquiring = True + self.pva.post_status("ACQUIRING", True) + self.pva.post_status("STATUS", ["ACQUIRING"]) + logger.info("DAQ started") + + def _stop_daq(self) -> None: + """Stop data acquisition.""" + if not self._acquiring: + return + + # Stop data flow on detector + self.commands.stop_data_flow() + + # Stop acquisition subprocess + self.acquisition.stop_acquisition() + + self._acquiring = False + self.pva.post_status("ACQUIRING", False) + self.pva.post_status("STATUS", ["IDLE"]) + logger.info("DAQ stopped") + + def _restart_daq(self) -> bool: + """Restart DAQ (stop then start).""" + self._stop_daq() + time.sleep(0.1) + self._start_daq() + return True + + def _status_update_loop(self) -> None: + """Periodically update status PVs.""" + while not self._shutdown_event.is_set(): try: - # Receive data with a timeout - try: - data = await asyncio.wait_for(self.data_socket.recv(), timeout=1.0) - - # Distribute to all raw data queues - for name, q in self.raw_data_queues.items(): - try: - # Non-blocking put to avoid hanging if queue is full - if not q.full(): - q.put_nowait(data) - except Exception as e: - self.logger.error(f"Error putting data in queue {name}: {e}") - - except asyncio.TimeoutError: - # No data received in timeout period - continue - + if self._pva_streamer: + self.pva.post_status("FRAME_RATE", self._pva_streamer.frame_rate) + self.pva.post_status("FRAME_COUNT", self._pva_streamer.frame_count) + + self.pva.post_status("FILE_WRITING", self.file_writer.is_enabled) + self.pva.post_status("SCAN_NUMBER", self.file_writer.scan_number) + self.pva.post_status("OUTPUT_DIR", self.file_writer.output_dir) except Exception as e: - self.logger.error(f"Error in data stream processing: {e}") - await asyncio.sleep(1.0) # Avoid rapid error loops - - self.logger.info("Data stream processor stopped") - - async def process_data_queues(self) -> None: - """ - Process data from queues and publish to PVA channels. - """ - self.logger.info("Queue processor started") - - while self.running: - try: - # Process all active data streams - for name, stream in list(self.data_streams.items()): - if stream.queue and stream.connected_clients > 0: - # Check if there's data in the queue - if not stream.queue.empty(): - try: - data = stream.queue.get_nowait() - - # Process the data based on the data type - if stream.data_type == 'NTNDArray': - # Format as image data - dimensions = stream.config.get('dimensions', (1024, 1024)) - width, height = dimensions - - # Reshape data to dimensions if needed - # Assuming data is raw bytes of 16-bit pixels - frame_array = { - 'value': data, - 'dimension': [ - {'size': height, 'offset': 0, 'fullSize': height, 'binning': 1, 'reverse': False}, - {'size': width, 'offset': 0, 'fullSize': width, 'binning': 1, 'reverse': False} - ] - } - - # Post to the PV - if stream.pv: - stream.pv.post(Value(stream.pv.nt, frame_array)) - - elif stream.data_type == 'ScalarArray': - # Format as scalar array - array_type = stream.config.get('type', 'd') - - # Post to the PV - if stream.pv: - stream.pv.post(Value(stream.pv.nt, {'value': data})) - - except queue.Empty: - pass # Queue was emptied by another task - except Exception as e: - self.logger.error(f"Error processing queue for {name}: {e}") - - # Small sleep to avoid tight loop - await asyncio.sleep(0.01) - - except Exception as e: - self.logger.error(f"Error in queue processor: {e}") - await asyncio.sleep(1.0) # Avoid rapid error loops - - self.logger.info("Queue processor stopped") - - async def periodic_status_update(self) -> None: - """Periodically publish status updates.""" - self.logger.info("Status updater started") - - while self.running: - try: - await self.publish_status() - await asyncio.sleep(STATUS_UPDATE_INTERVAL) - except Exception as e: - self.logger.error(f"Error in status updater: {e}") - await asyncio.sleep(1.0) # Avoid rapid error loops - - self.logger.info("Status updater stopped") - - # No longer need the _process_stream method as we now use queues for data distribution - - async def handle_requests(self) -> None: - """Handle incoming command requests.""" - while self.running: - try: - # Wait for a request - request = await self.cmd_socket.recv_string() - self.logger.info(f"Received request: {request}") - - # Forward the command to the status socket so clients can see it - command_info = { - 'type': 'command', - 'command': request, - 'timestamp': time.time() - } - await self.pub_socket.send_pyobj(command_info) - - try: - # Process the request - response = await self.handle_command(request) - - # Send the successful response - await self.cmd_socket.send_pyobj(response) - self.logger.info(f"Sent response: {response}") - - # Publish status update after command completion - await self.publish_status() - - # Also publish the command result - result_info = { - 'type': 'result', - 'command': request, - 'result': response, - 'timestamp': time.time() - } - await self.pub_socket.send_pyobj(result_info) - - except Exception as e: - # Send the exception directly as the error response - self.logger.error(f"Command error: {e}") - await self.cmd_socket.send_pyobj(e) - self.logger.info(f"Sent error: {e}") - - # Publish status update and error info - await self.publish_status() - - # Also publish the command error - error_info = { - 'type': 'error', - 'command': request, - 'error': str(e), - 'timestamp': time.time() - } - await self.pub_socket.send_pyobj(error_info) - - except Exception as e: - self.logger.error(f"Fatal error handling request: {e}") - # Try to send an error response if possible - try: - await self.cmd_socket.send_pyobj(Exception(f"Fatal server error: {e}")) - except: - pass - - async def run(self) -> None: + logger.debug("Status update error: %s", e) + + self._shutdown_event.wait(1.0) + + def run(self) -> None: """ Run the server. - - This starts all the necessary tasks and keeps the server running - until shutdown is called. + + 1. Initialize detector + 2. Start PVA server + 3. Start acquisition subprocess + 4. Start file writer and PVA streamer threads + 5. Block until shutdown signal """ - self.running = True - + # Initialize + self.initialize() + + # Set up PVA server + self.pva.setup() + + # Start acquisition subprocess + self.acquisition.start_process() + time.sleep(0.5) + + # Start PVA streamer thread + self._pva_streamer = PVAStreamer( + self.acquisition.ring_name, + self.pva.image_pv, + self.chip_config, + ) + self._pva_streamer.start() + + # Start file writer thread + self.file_writer.start() + + # Start status update thread + status_thread = threading.Thread(target=self._status_update_loop, + daemon=True, name="team1k-status") + status_thread.start() + + # Update initial status + self.pva.post_status("STATUS", ["READY"]) + self.pva.post_status("EXPOSURE_MODE", self.commands.exposure_mode) + + logger.info("Server running. Press Ctrl+C to stop.") + # Set up signal handlers - for sig in (signal.SIGINT, signal.SIGTERM): - asyncio.get_event_loop().add_signal_handler(sig, lambda: asyncio.create_task(self.shutdown())) - - # Start tasks - self._tasks = [ - asyncio.create_task(self.handle_requests()), - asyncio.create_task(self.periodic_status_update()), - asyncio.create_task(self.process_data_stream()), - asyncio.create_task(self.process_data_queues()) - ] - - # Log startup - self.logger.info(f"Server started on ports: {self.cmd_port} (commands), {self.pub_port} (status), {self.data_port} (data)") - - # Wait for tasks to complete - await asyncio.gather(*self._tasks, return_exceptions=True) - - async def shutdown(self) -> None: - """Shut down the server.""" - if not self.running: - return - - self.logger.info("Shutting down server...") - self.running = False - - # Cancel all tasks - for task in self._tasks: - task.cancel() - - # No subscribers to remove - - # Shut down KTest client if it exists - if hasattr(self, '_k_test_process') and self._k_test_process: - self._k_test_process.terminate() - try: - self._k_test_process.wait(timeout=5) - except subprocess.TimeoutExpired: - self._k_test_process.kill() - - # Close ZMQ sockets - for socket in (self.cmd_socket, self.pub_socket, self.data_socket): - socket.close() - - # Terminate ZMQ context - self.context.term() - - # Shut down EPICS PVA - if self.pva_server: - self.pva_server.stop() - - self.logger.info("Server shutdown complete") + def _handle_signal(signum, _frame): + logger.info("Received signal %d, shutting down...", signum) + self._shutdown_event.set() + signal.signal(signal.SIGINT, _handle_signal) + signal.signal(signal.SIGTERM, _handle_signal) -#--------------------------------------------------------------------------- -# EPICS PVA Handler -#--------------------------------------------------------------------------- + # Block until shutdown + self._shutdown_event.wait() + + # Shutdown sequence + self.shutdown() + + def shutdown(self) -> None: + """Clean shutdown of all components.""" + logger.info("Shutting down server...") + self._shutdown_event.set() + + # Stop DAQ + if self._acquiring: + self._stop_daq() + + # Stop PVA streamer + if self._pva_streamer: + self._pva_streamer.stop() + + # Stop file writer + self.file_writer.stop() + + # Shutdown acquisition subprocess + self.acquisition.shutdown() + + # Stop PVA server + self.pva.stop() + + # Close register interface + self.registers.close() + + logger.info("Server shutdown complete") -class PVAHandler: - """Handler for EPICS PV Access.""" - - def __init__(self, server, pva_name=None): - """ - Initialize the PVA handler. - - Args: - server: The Team1kServer instance - pva_name: The PVA channel name this handler is for - """ - self.server = server - self.pva_name = pva_name - self.logger = logging.getLogger("PVAHandler") - - def put(self, pv, op): - """Called when a client tries to update a PV.""" - self.logger.warning(f"Client attempted to put to read-only PV: {op.name()}") - op.done(error="PV is read-only") - - def rpc(self, pv, op): - """Called when a client makes an RPC call.""" - self.logger.warning(f"Client attempted RPC on PV: {op.name()}") - op.done(error="RPC not supported") - - def onFirstConnect(self, pv): - """Called when the first client connects to this PV.""" - pv_name = self.pva_name or "unknown" - self.logger.info(f"First client connected to PV: {pv_name}") - - # Increment connected clients count if this is a data stream - if self.pva_name and self.pva_name in self.server.data_streams: - stream = self.server.data_streams[self.pva_name] - stream.connected_clients += 1 - - def onLastDisconnect(self, pv): - """Called when the last client disconnects from this PV.""" - pv_name = self.pva_name or "unknown" - self.logger.info(f"Last client disconnected from PV: {pv_name}") - - # Decrement connected clients count if this is a data stream - if self.pva_name and self.pva_name in self.server.data_streams: - stream = self.server.data_streams[self.pva_name] - if stream.connected_clients > 0: - stream.connected_clients -= 1#--------------------------------------------------------------------------- -# Main entry point -#--------------------------------------------------------------------------- def main(): """Main entry point for the server.""" - # Configure logging - logging.basicConfig(level=logging.INFO) - - # Create and run the server - server = Team1kServer() - + parser = argparse.ArgumentParser(description="Team1k Detector Server") + parser.add_argument('--detector-ip', default=DEFAULT_DETECTOR_IP, + help=f"Detector IP address (default: {DEFAULT_DETECTOR_IP})") + parser.add_argument('--register-port', type=int, default=DEFAULT_REGISTER_PORT, + help=f"Detector register port (default: {DEFAULT_REGISTER_PORT})") + parser.add_argument('--data-port', type=int, default=DEFAULT_DATA_PORT, + help=f"Detector data port (default: {DEFAULT_DATA_PORT})") + parser.add_argument('--pv-prefix', default=DEFAULT_PV_PREFIX, + help=f"PVA prefix (default: {DEFAULT_PV_PREFIX})") + parser.add_argument('--config', default=None, + help="Parameter file to apply on startup") + parser.add_argument('--bellow-port', default=DEFAULT_BELLOW_PORT, + help=f"Bellow stage serial port (default: {DEFAULT_BELLOW_PORT})") + parser.add_argument('--log-level', default='INFO', + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], + help="Logging level (default: INFO)") + args = parser.parse_args() + + logging.basicConfig( + level=getattr(logging, args.log_level), + format='%(asctime)s %(name)-30s %(levelname)-8s %(message)s', + ) + + server = Team1kServer( + detector_ip=args.detector_ip, + register_port=args.register_port, + data_port=args.data_port, + pv_prefix=args.pv_prefix, + config_file=args.config, + bellow_port=args.bellow_port, + ) + try: - # Run the server - asyncio.run(server.run()) - except KeyboardInterrupt: - logging.info("Keyboard interrupt received, shutting down") + server.run() except Exception as e: - logging.error(f"Error running server: {e}") + logger.error("Fatal server error: %s", e) return 1 - + return 0 if __name__ == "__main__": sys.exit(main()) -