diff --git a/pyproject.toml b/pyproject.toml index 4bdb54f..9dd4f82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,3 @@ team1k-server = "team1k.server:main" where = ["src"] include = ["team1k*"] exclude = ["tests*"] - -[tool.setuptools] -packages = ["team1k"] \ No newline at end of file diff --git a/src/team1k.egg-info/PKG-INFO b/src/team1k.egg-info/PKG-INFO new file mode 100644 index 0000000..87798c6 --- /dev/null +++ b/src/team1k.egg-info/PKG-INFO @@ -0,0 +1,152 @@ +Metadata-Version: 2.4 +Name: team1k +Version: 0.0.1 +Summary: Controls for the TEAM1k detector +Author-email: Sebastian Strempfer +Requires-Python: >=3.9 +Description-Content-Type: text/markdown +Requires-Dist: numpy>=1.24.0 +Requires-Dist: pyserial>=3.5 +Requires-Dist: pydantic>=2.0.0 +Requires-Dist: typing-extensions>=4.5.0 + +# Team1k: X-ray Detector Control System + +This package provides control software for the Team1k X-ray detector system, including: + +1. A server that manages: + - Camera bellow stage (linear motion) + - Power supply control + - Detector control via KTest commands + - Data streaming to EPICS PVA + +2. Client libraries for both synchronous and asynchronous operation + +## Installation + +```bash +pip install -e . +``` + +## Server Usage + +Start the Team1k server: + +```bash +team1k-server +``` + +The server exposes: +- ZMQ REP socket on port 42003 for commands +- ZMQ PUB socket on port 42004 for status updates +- ZMQ SUB socket on port 42005 for data from NetworkInterface.cxx +- EPICS PVA for data streaming (if p4p is installed) + +## Client Usage + +### Basic Example + +```python +from team1k import Client, ExposureModes, TriggerModes + +# Create a client +client = Client('localhost') + +# Control the bellow stage +client.insert_detector() +client.retract_detector() + +# Control the power supply +client.power_supply_on() +voltages = client.read_voltages() +client.power_supply_off() + +# Configure the detector +client.set_exposure_mode(ExposureModes.ROLLING_SHUTTER) +client.set_trigger_mode(TriggerModes.INTERNAL_TRIGGER) +client.set_integration_time(100.0) # ms + +# Start/stop acquisition +client.start_daq() +client.stop_daq() + +# Close the client +client.close() +``` + +### Async Client + +```python +import asyncio +from team1k import AsyncClient + +async def main(): + # Create an async client + client = AsyncClient('localhost') + + # Subscribe to status updates + def status_callback(status): + print(f"Status update: {status}") + + await client.subscribe_to_status(status_callback) + + # Get current status + status = await client.get_status() + print(f"Current status: {status}") + + # Insert detector + await client.insert_detector() + + # Close client + await client.close() + +asyncio.run(main()) +``` + +## Example Scripts + +The `examples` directory contains sample scripts: + +- `client_demo.py`: Demonstrates synchronous and asynchronous client usage +- `status_monitor.py`: Real-time monitoring of system status + +## Server Commands + +The server accepts the following commands: + +| Command | Description | +|---------|-------------| +| `insert` | Insert the detector into the beam | +| `retract` | Retract the detector from the beam | +| `power_on` | Turn on the detector power supply | +| `power_off` | Turn off the detector power supply | +| `read_voltages` | Read the voltage values from the power supply | +| `get_status` | Get the current status of all devices | +| `subscribe PORT` | Subscribe to status updates with a callback port | +| `unsubscribe` | Unsubscribe from status updates | +| `kill` | Shut down the server | + +Additionally, all commands from the KTestCommand enum are supported. + +## Architecture + +The system uses a distributed architecture: + +1. **Team1k Server**: + - Manages all devices + - Provides command interface + - Publishes status updates + - Streams data to EPICS PVA + +2. **NetworkInterface.cxx**: + - Communicates with the detector hardware + - Streams data to a ZMQ socket + +3. **Team1k Client**: + - Connects to the server + - Sends commands + - Receives status updates + +4. **EPICS PVA**: + - Provides standard interface for data acquisition systems + - Allows integration with other EPICS tools diff --git a/src/team1k.egg-info/SOURCES.txt b/src/team1k.egg-info/SOURCES.txt new file mode 100644 index 0000000..62a207e --- /dev/null +++ b/src/team1k.egg-info/SOURCES.txt @@ -0,0 +1,12 @@ +README.md +pyproject.toml +src/team1k/Client.py +src/team1k/KTestClient.py +src/team1k/__init__.py +src/team1k/server.py +src/team1k.egg-info/PKG-INFO +src/team1k.egg-info/SOURCES.txt +src/team1k.egg-info/dependency_links.txt +src/team1k.egg-info/entry_points.txt +src/team1k.egg-info/requires.txt +src/team1k.egg-info/top_level.txt \ No newline at end of file diff --git a/src/team1k.egg-info/dependency_links.txt b/src/team1k.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/team1k.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/src/team1k.egg-info/entry_points.txt b/src/team1k.egg-info/entry_points.txt new file mode 100644 index 0000000..0c1f31c --- /dev/null +++ b/src/team1k.egg-info/entry_points.txt @@ -0,0 +1,2 @@ +[console_scripts] +team1k-server = team1k.server:main diff --git a/src/team1k.egg-info/requires.txt b/src/team1k.egg-info/requires.txt new file mode 100644 index 0000000..19d5195 --- /dev/null +++ b/src/team1k.egg-info/requires.txt @@ -0,0 +1,4 @@ +numpy>=1.24.0 +pyserial>=3.5 +pydantic>=2.0.0 +typing-extensions>=4.5.0 diff --git a/src/team1k.egg-info/top_level.txt b/src/team1k.egg-info/top_level.txt new file mode 100644 index 0000000..33f5e67 --- /dev/null +++ b/src/team1k.egg-info/top_level.txt @@ -0,0 +1 @@ +team1k diff --git a/src/team1k/Client.py b/src/team1k/Client.py index d4c6916..8d1ed28 100644 --- a/src/team1k/Client.py +++ b/src/team1k/Client.py @@ -1,64 +1,598 @@ -import socket +#!/usr/bin/env python3 +""" +Team1k Client - Interface to the Team1k detector control server. +""" + +import uuid import time -from KTestClient import KTestClient, KTestCommand, KTestError, ExposureModes, TriggerModes +import socket +import asyncio +import threading +from typing import Dict, List, Any, Optional, Tuple, Union, Callable + +import zmq +import zmq.asyncio + +from .KTestClient import KTestClient, KTestCommand, KTestError, ExposureModes, TriggerModes + + +class CommandException(Exception): + """Exception raised when a command fails on the server side.""" + pass + class Client: - def __init__(self, host: str = 'localhost', port: int = 42003): - self.host = host - self.port = port - self.k_test_client = KTestClient(host, port) - - def send_command(self, command: KTestCommand, *args) -> str: + """ + Client for the Team1k server. + + Provides methods to send commands and receive status updates. + """ + + def __init__(self, host: str = 'localhost', cmd_port: int = 42003, pub_port: int = 42004): """ - Send a command to the KTestServer and return the response. + Initialize the Team1k client. Args: - command (KTestCommand): Command enum to send - *args: Arguments for the command - Returns: - str: Response message + host: The hostname of the server + cmd_port: The command socket port + pub_port: The status publication port """ - return self.k_test_client.send_command(command, *args) + self.host = host + self.cmd_port = cmd_port + self.pub_port = pub_port + + # Set up ZMQ for commands + self.context = zmq.Context() + self.cmd_socket = self.context.socket(zmq.REQ) + self.cmd_socket.connect(f"tcp://{host}:{cmd_port}") + + # Status subscriber + self.sub_socket = None + self._status_thread = None + self._status_callback = None + self._status_running = False + self._last_status = None - def reset_connection(self) -> None: + def send_command(self, command: Union[str, KTestCommand], *args) -> Any: + """ + Send a command to the server. + + Args: + command: A string command or KTestCommand enum + *args: Arguments for the command + + Returns: + The server response + + Raises: + CommandException: If the command fails on the server side + Exception: For other errors + """ + # Format command string + if isinstance(command, KTestCommand): + cmd_str = f"{command.value.lower()} {' '.join(str(arg) for arg in args)}" + else: + cmd_str = f"{command} {' '.join(str(arg) for arg in args)}" + + # Send command + self.cmd_socket.send_string(cmd_str.strip()) + + # Get response + response = self.cmd_socket.recv_pyobj() + + # Handle exceptions from the server + if isinstance(response, Exception): + if isinstance(response, (ValueError, RuntimeError)): + raise CommandException(str(response)) + else: + raise response + + return response + + def subscribe_to_status(self, callback: Callable[[Dict[str, Any]], None]) -> None: + """ + Subscribe to status updates. + + Args: + callback: Function to call with status updates + """ + # Stop existing subscription if there is one + self.unsubscribe_from_status() + + # Create a ZMQ SUB socket + sub_socket = self.context.socket(zmq.SUB) + sub_socket.setsockopt_string(zmq.SUBSCRIBE, '') + sub_socket.connect(f"tcp://{self.host}:{self.pub_port}") + + # Set up status thread + self.sub_socket = sub_socket + self._status_callback = callback + self._status_running = True + self._status_thread = threading.Thread(target=self._status_listener, daemon=True) + self._status_thread.start() + + def unsubscribe_from_status(self) -> None: + """Stop receiving status updates.""" + if not self._status_running: + return + + # Stop the status thread + self._status_running = False + if self._status_thread: + self._status_thread.join(timeout=1.0) + + # Close the socket + if self.sub_socket: + self.sub_socket.close() + self.sub_socket = None + + def _status_listener(self) -> None: + """Background thread to listen for status updates.""" + poller = zmq.Poller() + poller.register(self.sub_socket, zmq.POLLIN) + + while self._status_running: + try: + socks = dict(poller.poll(timeout=1000)) + if self.sub_socket in socks: + status = self.sub_socket.recv_pyobj() + + # Store status + self._last_status = status + + # Call callback + if self._status_callback: + self._status_callback(status) + except Exception as e: + print(f"Error in status listener: {e}") + + def get_status(self) -> Dict[str, Any]: + """ + Get the current status from the server. + + Returns: + The current system status + """ + if self._last_status is not None: + return self._last_status + + return self.send_command("get_status") + + def insert_detector(self) -> str: + """ + Insert the detector into the beam. + + Returns: + Success message + """ + return self.send_command("insert") + + def retract_detector(self) -> str: + """ + Retract the detector from the beam. + + Returns: + Success message + """ + return self.send_command("retract") + + def power_supply_on(self) -> str: + """ + Turn on the detector power supply. + + Returns: + Success message + """ + return self.send_command("power_on") + + def power_supply_off(self) -> str: + """ + Turn off the detector power supply. + + Returns: + Success message + """ + return self.send_command("power_off") + + def read_voltages(self) -> Dict[str, List[float]]: + """ + Read the power supply voltages. + + Returns: + Dictionary with voltage values + """ + return self.send_command("read_voltages") + + # KTest command convenience methods + + def reset_connection(self) -> Any: """ Reset the connection between the k_test server and the detector. + + Returns: + Server response """ - response = self.send_command(KTestCommand.RESET_CONNECTION) - time.sleep(1) # Wait for the connection to stabilize + return self.send_command(KTestCommand.RESET_CONNECTION) - def start_daq(self) -> None: + def start_daq(self) -> Any: """ Start the data acquisition process. - """ - self.send_command(KTestCommand.RESTART_DAQ) - def stop_daq(self) -> None: + Returns: + Server response + """ + return self.send_command(KTestCommand.RESTART_DAQ) + + def stop_daq(self) -> Any: """ Stop the data acquisition process. + + Returns: + Server response """ - self.send_command(KTestCommand.STOP_DAQ) - - def set_exposure_mode(self, exposure_mode: ExposureModes) -> None: + return self.send_command(KTestCommand.STOP_DAQ) + + def set_exposure_mode(self, exposure_mode: ExposureModes) -> Any: """ Set the exposure mode for the detector. + + Args: + exposure_mode: The exposure mode to set + + Returns: + Server response """ - self.send_command(KTestCommand.SET_EXPOSURE_MODE, exposure_mode) - - def set_trigger_mode(self, trigger_mode: TriggerModes) -> None: + return self.send_command(KTestCommand.SET_EXPOSURE_MODE, exposure_mode) + + def set_trigger_mode(self, trigger_mode: TriggerModes) -> Any: """ Set the trigger mode for the detector. + + Args: + trigger_mode: The trigger mode to set + + Returns: + Server response """ - self.send_command(KTestCommand.SET_TRIGGER_MODE, trigger_mode) - - def set_integration_time(self, integration_time_ms: float) -> None: + return self.send_command(KTestCommand.SET_TRIGGER_MODE, trigger_mode) + + def set_integration_time(self, integration_time_ms: float) -> Any: """ Set the integration time in milliseconds. + + Args: + integration_time_ms: The integration time in milliseconds + + Returns: + Server response """ - self.send_command(KTestCommand.SET_INTEGRATION_TIME, integration_time_ms) - - def load_parameter_file(self, file_path: str) -> None: + return self.send_command(KTestCommand.SET_INTEGRATION_TIME, integration_time_ms) + + def load_parameter_file(self, file_path: str) -> Any: """ Load a parameter file for the detector. + + Args: + file_path: Path to the parameter file + + Returns: + Server response """ - self.send_command(KTestCommand.LOAD_PARAMETER_FILE, file_path) \ No newline at end of file + return self.send_command(KTestCommand.LOAD_PARAMETER_FILE, file_path) + + def close(self) -> None: + """Close the client connections.""" + # Unsubscribe if needed + self.unsubscribe_from_status() + + # Close command socket + if self.cmd_socket: + self.cmd_socket.close() + + # Terminate context + if self.context: + self.context.term() + + +class AsyncClient: + """ + Asynchronous client for the Team1k server. + + Provides async methods to send commands and receive status updates. + """ + + def __init__(self, host: str = 'localhost', cmd_port: int = 42003, pub_port: int = 42004): + """ + Initialize the asynchronous Team1k client. + + Args: + host: The hostname of the server + cmd_port: The command socket port + pub_port: The status publication port + """ + self.host = host + self.cmd_port = cmd_port + self.pub_port = pub_port + + # Set up ZMQ for commands + self.context = zmq.asyncio.Context() + self.cmd_socket = self.context.socket(zmq.REQ) + self.cmd_socket.connect(f"tcp://{host}:{cmd_port}") + + # Status subscriber + self.sub_socket = None + self._status_task = None + self._status_callbacks = [] + self._status_running = False + self._last_status = None + + async def send_command(self, command: Union[str, KTestCommand], *args) -> Any: + """ + Send a command to the server asynchronously. + + Args: + command: A string command or KTestCommand enum + *args: Arguments for the command + + Returns: + The server response + + Raises: + CommandException: If the command fails on the server side + Exception: For other errors + """ + # Format command string + if isinstance(command, KTestCommand): + cmd_str = f"{command.value.lower()} {' '.join(str(arg) for arg in args)}" + else: + cmd_str = f"{command} {' '.join(str(arg) for arg in args)}" + + # Send command + await self.cmd_socket.send_string(cmd_str.strip()) + + # Get response + response = await self.cmd_socket.recv_pyobj() + + # Handle exceptions from the server + if isinstance(response, Exception): + if isinstance(response, (ValueError, RuntimeError)): + raise CommandException(str(response)) + else: + raise response + + return response + + async def subscribe_to_status(self, callback: Callable[[Dict[str, Any]], None]) -> None: + """ + Subscribe to status updates asynchronously. + + Args: + callback: Function to call with status updates + """ + # Add callback if we're already subscribed + if self._status_running and self.sub_socket: + self._status_callbacks.append(callback) + return + + # Create a ZMQ SUB socket + sub_socket = self.context.socket(zmq.SUB) + sub_socket.setsockopt_string(zmq.SUBSCRIBE, '') + sub_socket.connect(f"tcp://{self.host}:{self.pub_port}") + + # Set up status task + self.sub_socket = sub_socket + self._status_callbacks = [callback] + self._status_running = True + self._status_task = asyncio.create_task(self._status_listener()) + + async def unsubscribe_from_status(self) -> None: + """Stop receiving status updates asynchronously.""" + if not self._status_running: + return + + # Stop the status task + self._status_running = False + if self._status_task: + self._status_task.cancel() + try: + await self._status_task + except asyncio.CancelledError: + pass + + # Close the socket + if self.sub_socket: + self.sub_socket.close() + self.sub_socket = None + + async def _status_listener(self) -> None: + """Background task to listen for status updates.""" + while self._status_running: + try: + # Receive message + status = await self.sub_socket.recv_pyobj() + + # Store status + self._last_status = status + + # Call callbacks + for callback in self._status_callbacks: + callback(status) + except asyncio.CancelledError: + # Task was cancelled + break + except Exception as e: + print(f"Error in status listener: {e}") + + async def get_status(self) -> Dict[str, Any]: + """ + Get the current status from the server asynchronously. + + Returns: + The current system status + """ + if self._last_status is not None: + return self._last_status + + return await self.send_command("get_status") + + async def insert_detector(self) -> str: + """ + Insert the detector into the beam asynchronously. + + Returns: + Success message + """ + return await self.send_command("insert") + + async def retract_detector(self) -> str: + """ + Retract the detector from the beam asynchronously. + + Returns: + Success message + """ + return await self.send_command("retract") + + async def power_supply_on(self) -> str: + """ + Turn on the detector power supply asynchronously. + + Returns: + Success message + """ + return await self.send_command("power_on") + + async def power_supply_off(self) -> str: + """ + Turn off the detector power supply asynchronously. + + Returns: + Success message + """ + return await self.send_command("power_off") + + async def read_voltages(self) -> Dict[str, List[float]]: + """ + Read the power supply voltages asynchronously. + + Returns: + Dictionary with voltage values + """ + return await self.send_command("read_voltages") + + # KTest command convenience methods + + async def reset_connection(self) -> Any: + """ + Reset the connection between the k_test server and the detector asynchronously. + + Returns: + Server response + """ + return await self.send_command(KTestCommand.RESET_CONNECTION) + + async def start_daq(self) -> Any: + """ + Start the data acquisition process asynchronously. + + Returns: + Server response + """ + return await self.send_command(KTestCommand.RESTART_DAQ) + + async def stop_daq(self) -> Any: + """ + Stop the data acquisition process asynchronously. + + Returns: + Server response + """ + return await self.send_command(KTestCommand.STOP_DAQ) + + async def set_exposure_mode(self, exposure_mode: ExposureModes) -> Any: + """ + Set the exposure mode for the detector asynchronously. + + Args: + exposure_mode: The exposure mode to set + + Returns: + Server response + """ + return await self.send_command(KTestCommand.SET_EXPOSURE_MODE, exposure_mode) + + async def set_trigger_mode(self, trigger_mode: TriggerModes) -> Any: + """ + Set the trigger mode for the detector asynchronously. + + Args: + trigger_mode: The trigger mode to set + + Returns: + Server response + """ + return await self.send_command(KTestCommand.SET_TRIGGER_MODE, trigger_mode) + + async def set_integration_time(self, integration_time_ms: float) -> Any: + """ + Set the integration time in milliseconds asynchronously. + + Args: + integration_time_ms: The integration time in milliseconds + + Returns: + Server response + """ + return await self.send_command(KTestCommand.SET_INTEGRATION_TIME, integration_time_ms) + + async def load_parameter_file(self, file_path: str) -> Any: + """ + Load a parameter file for the detector asynchronously. + + Args: + file_path: Path to the parameter file + + Returns: + Server response + """ + return await self.send_command(KTestCommand.LOAD_PARAMETER_FILE, file_path) + + async def close(self) -> None: + """Close the client connections asynchronously.""" + # Unsubscribe if needed + await self.unsubscribe_from_status() + + # Close command socket + if self.cmd_socket: + self.cmd_socket.close() + + # Terminate context + if self.context: + self.context.term() + +# Example usage +if __name__ == "__main__": + # Synchronous example + client = Client() + print("Synchronous client:") + try: + print("Status:", client.get_status()) + print("Voltages:", client.read_voltages()) + except CommandException as e: + print(f"Command failed: {e}") + client.close() + + # Asynchronous example + async def main(): + client = AsyncClient() + print("\nAsynchronous client:") + try: + status = await client.get_status() + print("Status:", status) + voltages = await client.read_voltages() + print("Voltages:", voltages) + except CommandException as e: + print(f"Command failed: {e}") + await client.close() + + asyncio.run(main()) \ No newline at end of file diff --git a/src/team1k/__init__.py b/src/team1k/__init__.py index 6b0fbbd..8f81f07 100644 --- a/src/team1k/__init__.py +++ b/src/team1k/__init__.py @@ -1,2 +1,8 @@ -from .Client import Client, KTestCommand, KTestError, ExposureModes, TriggerModes -__all__ = ['Client', 'KTestCommand', 'KTestError', 'ExposureModes', 'TriggerModes'] \ No newline at end of file +from .Client import Client, AsyncClient +from .KTestClient import KTestCommand, KTestError, ExposureModes, TriggerModes + +__all__ = [ + 'Client', 'AsyncClient', + 'KTestCommand', 'KTestError', + 'ExposureModes', 'TriggerModes' +] \ No newline at end of file diff --git a/src/team1k/server.py b/src/team1k/server.py index ab67843..3587402 100644 --- a/src/team1k/server.py +++ b/src/team1k/server.py @@ -1,167 +1,1196 @@ +#!/usr/bin/env python3 +""" +Team1k Server - Controls the Team1k X-ray detector system. + +This server manages: +1. A camera bellow stage (linear motion) +2. A power supply for the detector +3. The detector itself (via KTest commands) +4. Data streaming from the detector to EPICS PVA + +It provides both command-response (REP) and publish-subscribe (PUB) interfaces. +""" + import os +import sys +import enum +import time +import queue +import signal import socket +import asyncio +import logging +import pickle +import importlib +import dataclasses +import threading +from pathlib import Path +from typing import Dict, List, Any, Optional, Tuple, Union, Callable, Awaitable, TypeVar, Generic +import numpy as np + import zmq import zmq.asyncio -import asyncio import serial -from .KTestClient import KTestClient, KTestCommand, KTestError, KTEST_COMMAND_ARGS import psutil import subprocess -import logging -import enum +from p4p import Value, Type +from p4p.server import Server, ServerOperation +from p4p.server.thread import SharedPV +from p4p.nt import NTScalar, NTNDArray + +from .KTestClient import KTestClient, KTestCommand, KTestError, KTEST_COMMAND_ARGS, ExposureModes, TriggerModes + +# Configuration paths SW_PATH = "/home/daq-user/Team1k_ANL_UED_Camera/sw" K_TEST_CONFIG_PATH = "/home/daq-user/Team1k_ANL_UED_Camera/configuration/parameterfile_Team1k_UED_ANL.txt" +# Serial port for the camera bellow stage +BELLOW_STAGE_PORT = '/dev/CameraBellowStage' +BELLOW_STAGE_BAUDRATE = 9600 + +# ZMQ ports +CMD_PORT = 42003 # Command-response socket +PUB_PORT = 42004 # Status publication socket +DATA_PORT = 42005 # Data socket for NetworkInterface.cxx + +# Status update interval in seconds +STATUS_UPDATE_INTERVAL = 1.0 + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) + +#--------------------------------------------------------------------------- +# Commands and status enums +#--------------------------------------------------------------------------- + class Commands(enum.Enum): + """Commands that can be sent to the Team1kServer.""" + + # Device control commands INSERT = "insert" - """Insert the detector.""" + """Insert the detector into the beam.""" RETRACT = "retract" - """Retract the detector.""" - - POWER_SUPPLY = "power_supply" - """Power the detector on or off.""" + """Retract the detector from the beam.""" + POWER_SUPPLY_ON = "power_on" + """Turn the detector power supply on.""" + + POWER_SUPPLY_OFF = "power_off" + """Turn the detector power supply off.""" + + READ_VOLTAGES = "read_voltages" + """Read the voltage values from the power supply.""" + + # Server control commands KILL = "kill" """Kill this server.""" + + GET_STATUS = "get_status" + """Get the current status of all devices.""" + + # Stream processor commands + ADD_STREAM = "add_stream" + """Add a new data stream processor.""" + + REMOVE_STREAM = "remove_stream" + """Remove a data stream processor.""" + + LIST_STREAMS = "list_streams" + """List all active stream processors.""" -server_command_arg_types = { - Commands.INSERT: (), - Commands.RETRACT: (), - Commands.POWER_SUPPLY: (bool,), - Commands.KILL: (), -} -command_arg_types = {**server_command_arg_types, **KTEST_COMMAND_ARGS} +class DeviceType(enum.Enum): + """Types of devices managed by the server.""" + BELLOW_STAGE = "bellow_stage" + POWER_SUPPLY = "power_supply" + DETECTOR = "detector" + SERVER = "server" + + +class DeviceStatus(enum.Enum): + """Status of a device.""" + UNKNOWN = "unknown" + READY = "ready" + BUSY = "busy" + ERROR = "error" + DISCONNECTED = "disconnected" + + +@dataclasses.dataclass +class BellowStageState: + """State of the camera bellow stage.""" + status: DeviceStatus = DeviceStatus.UNKNOWN + position: int = 0 + is_inserted: bool = False + is_moving: bool = False + error_message: Optional[str] = None + + +@dataclasses.dataclass +class PowerSupplyState: + """State of the power supply.""" + status: DeviceStatus = DeviceStatus.UNKNOWN + is_on: bool = False + voltage_values: List[float] = dataclasses.field(default_factory=lambda: [0.0, 0.0, 0.0]) + error_message: Optional[str] = None + + +@dataclasses.dataclass +class DetectorState: + """State of the detector.""" + status: DeviceStatus = DeviceStatus.UNKNOWN + exposure_mode: Optional[ExposureModes] = None + trigger_mode: Optional[TriggerModes] = None + integration_time: float = 0.0 + is_acquiring: bool = False + error_message: Optional[str] = None + + +@dataclasses.dataclass +class SystemState: + """Overall state of the Team1k system.""" + bellow_stage: BellowStageState = dataclasses.field(default_factory=BellowStageState) + power_supply: PowerSupplyState = dataclasses.field(default_factory=PowerSupplyState) + detector: DetectorState = dataclasses.field(default_factory=DetectorState) + server_uptime: float = 0.0 + start_time: float = dataclasses.field(default_factory=time.time) + + @property + def as_dict(self) -> Dict[str, Any]: + """Convert the state to a dictionary.""" + return { + 'bellow_stage': dataclasses.asdict(self.bellow_stage), + 'power_supply': dataclasses.asdict(self.power_supply), + 'detector': dataclasses.asdict(self.detector), + 'server_uptime': time.time() - self.start_time + } + + +#--------------------------------------------------------------------------- +# Utility functions +#--------------------------------------------------------------------------- + +def parse_command(command_str: str) -> Tuple[Union[Commands, KTestCommand], List[Any]]: + """ + Parse a command string into a command enum and arguments. + + Args: + command_str: The command string to parse + + Returns: + A tuple of (command, args) where command is a Commands or KTestCommand enum + and args is a list of arguments + """ + parts = command_str.strip().lower().split() + if not parts: + raise ValueError("Empty command") + + cmd = parts[0] + args = parts[1:] + + # Try to match with server commands first + try: + return Commands(cmd), args + except ValueError: + pass + + # Try to match with KTest commands + try: + return KTestCommand(cmd.upper()), args + except ValueError: + raise ValueError(f"Unknown command: {cmd}") + + +def load_processor_function(processor_info: str) -> Callable: + """ + Dynamically load a processor function from a module. + + Args: + processor_info: String in format "module_name:function_name" + + Returns: + The loaded function + + Raises: + ValueError: If the processor_info is invalid or the function cannot be loaded + """ + module_name = None + func_name = None + + try: + parts = processor_info.split(':', 1) + if len(parts) != 2: + raise ValueError(f"Invalid processor format. Expected 'module:function', got '{processor_info}'") + + module_name, func_name = parts + module = importlib.import_module(module_name) + function = getattr(module, func_name) + + # Validate it's callable + if not callable(function): + raise ValueError(f"{func_name} is not callable") + + return function + except ImportError as e: + raise ValueError(f"Could not import module '{module_name}': {e}") + except AttributeError: + raise ValueError(f"Function '{func_name}' not found in module '{module_name}'") + except Exception as e: + raise ValueError(f"Error loading processor function: {e}") + +def parse_args(command: Union[Commands, KTestCommand], args: List[str]) -> List[Any]: + """ + Parse and validate command arguments. + + Args: + command: The command enum + args: The string arguments to parse + + Returns: + A list of parsed arguments + """ + if isinstance(command, Commands): + # Server commands with no arguments + if command in (Commands.INSERT, Commands.RETRACT, Commands.POWER_SUPPLY_ON, + Commands.POWER_SUPPLY_OFF, Commands.READ_VOLTAGES, Commands.KILL, + Commands.GET_STATUS, Commands.LIST_STREAMS): + if args: + raise ValueError(f"Command {command.value} takes no arguments") + return [] + + # Stream processor commands + elif command == Commands.ADD_STREAM: + if len(args) < 3: + raise ValueError(f"Command {command.value} requires at least port, pva_name, and dimensions") + + # Parse port + try: + port = int(args[0]) + except ValueError: + raise ValueError(f"Invalid port number: {args[0]}") + + # PVA name + pva_name = args[1] + + # Dimensions - comma-separated list of integers + try: + dimensions = tuple(int(d) for d in args[2].split(',')) + if len(dimensions) != 2: + raise ValueError(f"Dimensions must be two integers (width,height)") + except ValueError as e: + raise ValueError(f"Invalid dimensions: {e}") + + # Optional callback module and function + callback_info = None + if len(args) > 3: + callback_info = args[3] + + return [port, pva_name, dimensions, callback_info] + + elif command == Commands.REMOVE_STREAM: + if len(args) != 1: + raise ValueError(f"Command {command.value} requires a PVA name") + return [args[0]] + + else: + raise ValueError(f"Unknown server command: {command.value}") + else: + # KTest commands + expected_types = KTEST_COMMAND_ARGS.get(command, ()) + if len(args) != len(expected_types): + raise ValueError(f"Expected {len(expected_types)} arguments for {command.value}, got {len(args)}") + + parsed_args = [] + for i, (arg, arg_type) in enumerate(zip(args, expected_types)): + try: + if arg_type == bool: + # Handle bool conversion specially + if arg.lower() in ('true', '1', 'yes'): + parsed_args.append(True) + elif arg.lower() in ('false', '0', 'no'): + parsed_args.append(False) + else: + raise ValueError(f"Invalid boolean value: {arg}") + elif issubclass(arg_type, enum.Enum): + # Handle enum conversion + try: + parsed_args.append(arg_type(int(arg))) + except ValueError: + raise ValueError(f"Invalid enum value for {arg_type.__name__}: {arg}") + else: + parsed_args.append(arg_type(arg)) + except ValueError as e: + raise ValueError(f"Invalid argument {i+1} for {command.value}: {e}") + + return parsed_args + + +#--------------------------------------------------------------------------- +# Main server class +#--------------------------------------------------------------------------- class Team1kServer: - def __init__(self, host="*", port=42003): - self.host = host - self.port = port - self.context = zmq.asyncio.Context() - self.socket: zmq.asyncio.Socket = self.context.socket(zmq.REP) # REP socket - self.socket.bind(f"tcp://{self.host}:{self.port}") - self.loop = asyncio.get_event_loop() - self._k_test_process = None + """ + Server to control the Team1k detector system. + + Provides: + - ZMQ REP socket for command-response + - ZMQ PUB socket for status updates + - EPICS PV Access for data streaming + """ + + def __init__(self, cmd_port=CMD_PORT, pub_port=PUB_PORT, data_port=DATA_PORT): + """ + Initialize the Team1k server. + + Args: + cmd_port: Port for the ZMQ REP socket (commands) + pub_port: Port for the ZMQ PUB socket (status updates) + data_port: Port for the ZMQ data socket (from NetworkInterface.cxx) + """ + self.cmd_port = cmd_port + self.pub_port = pub_port + self.data_port = data_port + + # Setup logging self.logger = logging.getLogger("Team1kServer") - self.logger.setLevel(logging.DEBUG) + + # Initialize ZMQ + self.context = zmq.asyncio.Context() + + # Command socket (REP) + self.cmd_socket = self.context.socket(zmq.REP) + self.cmd_socket.bind(f"tcp://*:{self.cmd_port}") + + # Status publication socket (PUB) + self.pub_socket = self.context.socket(zmq.PUB) + self.pub_socket.bind(f"tcp://*:{self.pub_port}") + + # Data socket (SUB) - connects to NetworkInterface.cxx + self.data_socket = self.context.socket(zmq.SUB) + self.data_socket.setsockopt_string(zmq.SUBSCRIBE, '') + self.data_socket.connect(f"tcp://localhost:{self.data_port}") + + # Data distribution + self.raw_data_queues = {} # {name: queue.Queue} + self.data_streams = {} # {pva_name: DataStream} + + # System state + self.state = SystemState() + + # Asyncio event loop and tasks + self._k_test_process = None + self.running = False + self._tasks = [] + + # EPICS PVA server + self.pva_server = None + self.pv_dict = {} # Dictionary of all active PVs + self.setup_pva_server() + + @dataclasses.dataclass + class DataStream: + """Configuration for a PVA data stream.""" + pva_name: str + data_type: str # 'NTNDArray' or 'ScalarArray' + config: Dict[str, Any] # Configuration for the data type + queue: queue.Queue + pv: Optional[SharedPV] = None + connected_clients: int = 0 + running: bool = False + task: Optional[asyncio.Task] = None + + def get_raw_data_queue(self, name: str, maxsize: int = 10) -> queue.Queue: + """ + Get a queue that will receive raw data frames. + + Args: + name: Unique name for this queue (used for identification) + maxsize: Maximum queue size (older items will be dropped when full) + + Returns: + A queue object that will receive raw data frames + """ + if name in self.raw_data_queues: + return self.raw_data_queues[name] + + q = queue.Queue(maxsize=maxsize) + self.raw_data_queues[name] = q + self.logger.info(f"Created raw data queue: {name}") + return q + + def add_data_stream(self, name: str, data_type: str, config: Dict[str, Any], data_queue: queue.Queue) -> None: + """ + Add a data stream that will publish data from the queue to a PVA channel. + + Args: + name: The PVA name for this stream + data_type: Type of data ('NTNDArray' or 'ScalarArray') + config: Configuration for the data type + data_queue: Queue that will provide data to be published + """ + if name in self.data_streams: + raise ValueError(f"Data stream '{name}' already exists") + + # Validate data type + if data_type not in ('NTNDArray', 'ScalarArray'): + raise ValueError(f"Unsupported data type: {data_type}") + + # Create the appropriate PV based on type + if data_type == 'NTNDArray': + # For image data + dimensions = config.get('dimensions', (1024, 1024)) + pv = SharedPV( + handler=PVAHandler(self, pva_name=name), + nt=NTNDArray(), + initial=np.full((1024, 1024), 0) + ) + elif data_type == 'ScalarArray': + # For array data + array_type = config.get('type', 'd') # Default to double + length = config.get('length', 1) + pv = SharedPV( + handler=PVAHandler(self, pva_name=name), + nt=NTScalar('a' + array_type), + initial=Value(NTScalar.buildType('a' + array_type), {'value': np.zeros(length)}) + ) + pv.open(np.zeros(length)) + + # Create DataStream object + data_stream = self.DataStream( + pva_name=name, + data_type=data_type, + config=config, + queue=data_queue, + pv=pv + ) + + # Add to our registry + self.data_streams[name] = data_stream + + # Update PV dictionary and recreate server + self.pv_dict[name] = pv + if self.pva_server: + self.pva_server.stop() + self.pva_server = Server([self.pv_dict]) + + self.logger.info(f"Added data stream: {name} ({data_type})") + + def setup_pva_server(self): + """Set up the EPICS PV Access server.""" + + # Create only the status PV initially + self.status_pv = SharedPV( + nt=NTScalar('as'), + value=['INITIALIZING'] + ) + self.status_pv.open(["INITIALIZING"]) + + # Initialize the PV dictionary with just the status PV + self.pv_dict = {'TEAM1K:STATUS': self.status_pv} + + # Create server with the PV dictionary + self.pva_server = Server(providers=[self.pv_dict]) + + # Create raw data queue for the default stream + raw_queue = self.get_raw_data_queue('default', maxsize=30) + + # Add default raw data stream for backward compatibility + self.add_data_stream( + 'TEAM1K:RAW', + 'NTNDArray', + {'dimensions': (1024, 1024)}, + raw_queue + ) - async def handle_request(self, request): - # Process the incoming request - command = request.strip().lower().split() - if not command or command[0] == "": - return ValueError("Empty command") - - try: - command[0] = Commands(command[0]) if command[0] in Commands._value2member_map_ else KTestCommand(command[0].upper()) - except ValueError: - return ValueError(f"Unknown command: {command[0]}") - - arg_types = command_arg_types.get(command, None) - if arg_types is None: - return ValueError(f"Unknown command: {command[0]}") - if len(arg_types) != len(command) - 1: - return ValueError(f"Expected {len(arg_types)} arguments ({arg_types}), got {len(command) - 1}") - for i, arg_type in enumerate(arg_types): - try: - command[i + 1] = arg_type(command[i + 1]) - except ValueError: - return ValueError(f"Argument {i + 1} should be of type {arg_type.__name__}, got {command[i + 1]}") - - match command: - case [Commands.INSERT]: - response = await self.move_camera(insert=True) - case [Commands.RETRACT]: - response = await self.move_camera(insert=False) - case [Commands.KILL]: - response = True - await self.shutdown() - case [Commands.POWER_SUPPLY, state]: - # Turn power supply on or off - pass - case [cmd, *args] if isinstance(cmd, KTestCommand): - try: - response = self.k_client.send_command(cmd, *args) - except KTestError as e: - response = e - except Exception as e: - response = RuntimeError(f"Unexpected error: {e}") - case _: - response = ValueError(f"Unhandled command: {command}") - - return response + #----------------------------------------------------------------------- + # Device control methods + #----------------------------------------------------------------------- @property def k_client(self) -> KTestClient: + """ + Get or create a KTestClient instance. + + Returns: + A KTestClient connected to the detector + """ if not hasattr(self, '_k_client'): - # See if k_test server is running on localhost + # Check if k_test server is running on localhost if not any("k_test" in p.info['name'] for p in psutil.process_iter(['name'])): - # If not, start it + # Start k_test server if not running self.logger.info("Starting k_test server on localhost") k_test_cmd = ["k_test", K_TEST_CONFIG_PATH] self.logger.debug(f"Running command: {' '.join(k_test_cmd)} in {SW_PATH}") self._k_test_process = subprocess.Popen(k_test_cmd, cwd=SW_PATH) + time.sleep(1) # Give it time to start + # Create the client self._k_client = KTestClient("localhost") + + # Update detector status + self.state.detector.status = DeviceStatus.READY + return self._k_client - - async def move_camera(self, insert: bool): - """Move the camera bellow stage in or out.""" - self.logger.info("Inserting detector") - try: - with serial.Serial('/dev/CameraBellowStage', 9600, timeout=1) as ser: - await asyncio.sleep(1) - ser.write(b'\r') - await asyncio.sleep(1) - ser.write(b'\x03') # Send Ctrl-C to stop any ongoing movement - await asyncio.sleep(1) - - ser.write(b'rc=100\r') # Set speed to 100 - await asyncio.sleep(1) - - if insert: - ser.write(b'vm=100000\r') # Set maximum speed (we're being helped by atmospheric pressure) - await asyncio.sleep(1) - - ser.write(b'mr 2000000\r') # Move 2000000 (insert) TODO: Convert to absolute position - await asyncio.sleep(10) - else: - ser.write(b'vm=100001\r') # Set maximum speed (not sure where this value comes from) - await asyncio.sleep(1) - - ser.write(b'mr -2000000\r') # Move -2000000 (retract) TODO: Convert to absolute position - await asyncio.sleep(10) - - ser.write(b'pr p\r') # Print current position - await asyncio.sleep(1) - except serial.SerialTimeoutException as e: - self.logger.error(f"Serial timeout: {e}") - return TimeoutError(f"Server <--> Camera Bellow Stage serial timeout: {e}") - except serial.SerialException as e: - self.logger.error(f"Serial error: {e}") - return ConnectionError(f"Server <--> Camera Bellow Stage serial error: {e}") - except Exception as e: - self.logger.error(f"Unexpected error: {e}") - return RuntimeError(f"Server <--> Camera Bellow Stage unexpected error: {e}") - return True - - async def process_request(self, request): - self.logger.info(f"Received request: {request}") - response = await self.handle_request(request) - await self.socket.send_pyobj(response) - self.logger.info(f"Sent response: {response}") - - async def serve(self): - while True: - # Wait for a request - request = await self.socket.recv_string() - await self.process_request(request) - async def shutdown(self): - if self._k_test_process: + async def move_bellow_stage(self, insert: bool) -> Tuple[bool, Optional[str]]: + """ + Move the camera bellow stage in or out. + + Args: + insert: True to insert the detector, False to retract it + + Returns: + A tuple of (success, error_message) + """ + action = "Inserting" if insert else "Retracting" + self.logger.info(f"{action} detector") + + # Update state + self.state.bellow_stage.is_moving = True + self.state.bellow_stage.status = DeviceStatus.BUSY + await self.publish_status() + + try: + with serial.Serial(BELLOW_STAGE_PORT, BELLOW_STAGE_BAUDRATE, timeout=1) as ser: + # Initialize communication + await asyncio.sleep(0.5) + ser.write(b'\r') + await asyncio.sleep(0.5) + ser.write(b'\x03') # Ctrl-C to stop any ongoing movement + await asyncio.sleep(0.5) + + # Set speed + ser.write(b'rc=100\r') + await asyncio.sleep(0.5) + + # Move to position + if insert: + ser.write(b'vm=100000\r') # Max speed for insertion + await asyncio.sleep(0.5) + ser.write(b'mr 2000000\r') # Move relative + else: + ser.write(b'vm=100001\r') # Max speed for retraction + await asyncio.sleep(0.5) + ser.write(b'mr -2000000\r') # Move relative + + # Wait for movement to complete (could be improved with polling) + await asyncio.sleep(10) + + # Get final position + ser.write(b'pr p\r') # Print position + await asyncio.sleep(0.5) + response_bytes = ser.read_all() + response = response_bytes.decode('utf-8', errors='ignore') if response_bytes else "" + + # Parse position from response + try: + # Extract position value (assuming format like "p=12345") + for line in response.splitlines(): + if line.strip().startswith('p='): + position = int(line.strip()[2:]) + self.state.bellow_stage.position = position + break + except Exception as e: + self.logger.warning(f"Failed to parse position: {e}") + + # Update state + self.state.bellow_stage.is_inserted = insert + self.state.bellow_stage.is_moving = False + self.state.bellow_stage.status = DeviceStatus.READY + self.state.bellow_stage.error_message = None + + return True, None + + except serial.SerialTimeoutException as e: + error_msg = f"Serial timeout: {e}" + self.logger.error(error_msg) + self.state.bellow_stage.status = DeviceStatus.ERROR + self.state.bellow_stage.is_moving = False + self.state.bellow_stage.error_message = error_msg + return False, error_msg + + except serial.SerialException as e: + error_msg = f"Serial error: {e}" + self.logger.error(error_msg) + self.state.bellow_stage.status = DeviceStatus.ERROR + self.state.bellow_stage.is_moving = False + self.state.bellow_stage.error_message = error_msg + return False, error_msg + + except Exception as e: + error_msg = f"Unexpected error: {e}" + self.logger.error(error_msg) + self.state.bellow_stage.status = DeviceStatus.ERROR + self.state.bellow_stage.is_moving = False + self.state.bellow_stage.error_message = error_msg + return False, error_msg + + async def power_supply_control(self, turn_on: bool) -> Tuple[bool, Optional[str]]: + """ + Turn the power supply on or off. + + Args: + turn_on: True to turn on, False to turn off + + Returns: + A tuple of (success, error_message) + """ + action = "Turning on" if turn_on else "Turning off" + self.logger.info(f"{action} power supply") + + # Update state + self.state.power_supply.status = DeviceStatus.BUSY + await self.publish_status() + + try: + # Placeholder for actual power supply control + # Replace with your actual implementation + await asyncio.sleep(1) + + # Update state with the result + self.state.power_supply.is_on = turn_on + self.state.power_supply.status = DeviceStatus.READY + self.state.power_supply.error_message = None + + return True, None + + except Exception as e: + error_msg = f"Power supply error: {e}" + self.logger.error(error_msg) + self.state.power_supply.status = DeviceStatus.ERROR + self.state.power_supply.error_message = error_msg + return False, error_msg + + async def read_power_supply_voltages(self) -> Tuple[List[float], Optional[str]]: + """ + Read the voltage values from the power supply. + + Returns: + A tuple of (voltage_values, error_message) + """ + self.logger.info("Reading power supply voltages") + + try: + # Placeholder for actual voltage reading + # Replace with your actual implementation + voltage_values = [3.3, 5.0, 12.0] # Example values + + # Update state + self.state.power_supply.voltage_values = voltage_values + self.state.power_supply.status = DeviceStatus.READY + self.state.power_supply.error_message = None + + return voltage_values, None + + except Exception as e: + error_msg = f"Error reading voltages: {e}" + self.logger.error(error_msg) + self.state.power_supply.status = DeviceStatus.ERROR + self.state.power_supply.error_message = error_msg + return [], error_msg + + async def execute_ktest_command(self, command: KTestCommand, args: List[Any]) -> Tuple[Any, Optional[str]]: + """ + Execute a KTest command on the detector. + + Args: + command: The KTest command to execute + args: The arguments for the command + + Returns: + A tuple of (result, error_message) + """ + self.logger.info(f"Executing KTest command: {command.value} {args}") + + try: + # Update detector state + self.state.detector.status = DeviceStatus.BUSY + await self.publish_status() + + # Execute command + result = self.k_client.send_command(command, *args) + + # Update state based on command + if command == KTestCommand.SET_EXPOSURE_MODE and args: + self.state.detector.exposure_mode = args[0] + elif command == KTestCommand.SET_TRIGGER_MODE and args: + self.state.detector.trigger_mode = args[0] + elif command == KTestCommand.SET_INTEGRATION_TIME and args: + self.state.detector.integration_time = args[0] + elif command == KTestCommand.RESTART_DAQ: + self.state.detector.is_acquiring = True + elif command == KTestCommand.STOP_DAQ: + self.state.detector.is_acquiring = False + + self.state.detector.status = DeviceStatus.READY + self.state.detector.error_message = None + + return result, None + + except KTestError as e: + error_msg = f"KTest error: {e}" + self.logger.error(error_msg) + self.state.detector.status = DeviceStatus.ERROR + self.state.detector.error_message = error_msg + return None, error_msg + + except Exception as e: + error_msg = f"Unexpected error: {e}" + self.logger.error(error_msg) + self.state.detector.status = DeviceStatus.ERROR + self.state.detector.error_message = error_msg + return None, error_msg + + #----------------------------------------------------------------------- + # Status publication + #----------------------------------------------------------------------- + + async def publish_status(self) -> None: + """Publish current status to PUB socket and EPICS PV.""" + # Update the state data + state_dict = self.state.as_dict + + # Create a status message with type indicator + status_message = { + 'type': 'status', + 'status': state_dict, + 'timestamp': time.time() + } + + # Publish to ZMQ PUB socket + await self.pub_socket.send_pyobj(status_message) + + # Update EPICS PV + if self.pva_server: + status_items = [] + for device, state in state_dict.items(): + if isinstance(state, dict) and 'status' in state: + status_items.append(f"{device}: {state['status']}") + self.status_pv.post(Value(self.status_pv.nt.type, {'value': status_items})) + + #----------------------------------------------------------------------- + # Command handling + #----------------------------------------------------------------------- + + async def handle_command(self, command_str: str) -> Any: + """ + Handle a command string. + + Args: + command_str: The command string to handle + + Returns: + The response from the command handler + + Raises: + ValueError: If the command is invalid or arguments are incorrect + Exception: For other errors + """ + # Parse the command + command, arg_strings = parse_command(command_str) + + # Parse arguments + args = parse_args(command, arg_strings) + + # Handle server commands + if isinstance(command, Commands): + return await self.handle_server_command(command, args) + + # Handle KTest commands + return await self.execute_ktest_command(command, args) + + async def handle_server_command(self, command: Commands, args: List[Any]) -> Any: + """ + Handle a server command. + + Args: + command: The command to handle + args: The parsed arguments + + Returns: + A response containing the command result + + Raises: + ValueError: For invalid commands + RuntimeError: For execution errors + """ + if command == Commands.INSERT: + success, error = await self.move_bellow_stage(insert=True) + if not success: + raise RuntimeError(f"Failed to insert detector: {error}") + await self.publish_status() + return "Detector inserted successfully" + + elif command == Commands.RETRACT: + success, error = await self.move_bellow_stage(insert=False) + if not success: + raise RuntimeError(f"Failed to retract detector: {error}") + await self.publish_status() + return "Detector retracted successfully" + + elif command == Commands.POWER_SUPPLY_ON: + success, error = await self.power_supply_control(turn_on=True) + if not success: + raise RuntimeError(f"Failed to turn on power supply: {error}") + await self.publish_status() + return "Power supply turned on successfully" + + elif command == Commands.POWER_SUPPLY_OFF: + success, error = await self.power_supply_control(turn_on=False) + if not success: + raise RuntimeError(f"Failed to turn off power supply: {error}") + await self.publish_status() + return "Power supply turned off successfully" + + elif command == Commands.READ_VOLTAGES: + voltages, error = await self.read_power_supply_voltages() + if not voltages and error: + raise RuntimeError(f"Failed to read voltages: {error}") + await self.publish_status() + return {"voltages": voltages} + + elif command == Commands.KILL: + asyncio.create_task(self.shutdown()) + await self.publish_status() + return "Server shutting down" + + elif command == Commands.GET_STATUS: + await self.publish_status() + return {"status": self.state.as_dict} + + elif command == Commands.LIST_STREAMS: + streams = [] + for name, stream in self.data_streams.items(): + streams.append({ + 'pva_name': name, + 'data_type': stream.data_type, + 'config': stream.config, + 'running': stream.running + }) + return {'streams': streams} + + else: + raise ValueError(f"Unknown command: {command.value}") + + # No need for a finally block since we publish_status in each branch + + #----------------------------------------------------------------------- + # Main server loop + #----------------------------------------------------------------------- + + async def process_data_stream(self) -> None: + """ + Process data from the NetworkInterface.cxx stream. + + This method receives raw data from the data socket and: + 1. Publishes it to the TEAM1K:RAW PVA channel + 2. Distributes it to all registered raw data queues + """ + self.logger.info("Data stream processor started") + + while self.running: + try: + # Receive data with a timeout + try: + data = await asyncio.wait_for(self.data_socket.recv(), timeout=1.0) + + # Distribute to all raw data queues + for name, q in self.raw_data_queues.items(): + try: + # Non-blocking put to avoid hanging if queue is full + if not q.full(): + q.put_nowait(data) + except Exception as e: + self.logger.error(f"Error putting data in queue {name}: {e}") + + except asyncio.TimeoutError: + # No data received in timeout period + continue + + except Exception as e: + self.logger.error(f"Error in data stream processing: {e}") + await asyncio.sleep(1.0) # Avoid rapid error loops + + self.logger.info("Data stream processor stopped") + + async def process_data_queues(self) -> None: + """ + Process data from queues and publish to PVA channels. + """ + self.logger.info("Queue processor started") + + while self.running: + try: + # Process all active data streams + for name, stream in list(self.data_streams.items()): + if stream.queue and stream.connected_clients > 0: + # Check if there's data in the queue + if not stream.queue.empty(): + try: + data = stream.queue.get_nowait() + + # Process the data based on the data type + if stream.data_type == 'NTNDArray': + # Format as image data + dimensions = stream.config.get('dimensions', (1024, 1024)) + width, height = dimensions + + # Reshape data to dimensions if needed + # Assuming data is raw bytes of 16-bit pixels + frame_array = { + 'value': data, + 'dimension': [ + {'size': height, 'offset': 0, 'fullSize': height, 'binning': 1, 'reverse': False}, + {'size': width, 'offset': 0, 'fullSize': width, 'binning': 1, 'reverse': False} + ] + } + + # Post to the PV + if stream.pv: + stream.pv.post(Value(stream.pv.nt, frame_array)) + + elif stream.data_type == 'ScalarArray': + # Format as scalar array + array_type = stream.config.get('type', 'd') + + # Post to the PV + if stream.pv: + stream.pv.post(Value(stream.pv.nt, {'value': data})) + + except queue.Empty: + pass # Queue was emptied by another task + except Exception as e: + self.logger.error(f"Error processing queue for {name}: {e}") + + # Small sleep to avoid tight loop + await asyncio.sleep(0.01) + + except Exception as e: + self.logger.error(f"Error in queue processor: {e}") + await asyncio.sleep(1.0) # Avoid rapid error loops + + self.logger.info("Queue processor stopped") + + async def periodic_status_update(self) -> None: + """Periodically publish status updates.""" + self.logger.info("Status updater started") + + while self.running: + try: + await self.publish_status() + await asyncio.sleep(STATUS_UPDATE_INTERVAL) + except Exception as e: + self.logger.error(f"Error in status updater: {e}") + await asyncio.sleep(1.0) # Avoid rapid error loops + + self.logger.info("Status updater stopped") + + # No longer need the _process_stream method as we now use queues for data distribution + + async def handle_requests(self) -> None: + """Handle incoming command requests.""" + while self.running: + try: + # Wait for a request + request = await self.cmd_socket.recv_string() + self.logger.info(f"Received request: {request}") + + # Forward the command to the status socket so clients can see it + command_info = { + 'type': 'command', + 'command': request, + 'timestamp': time.time() + } + await self.pub_socket.send_pyobj(command_info) + + try: + # Process the request + response = await self.handle_command(request) + + # Send the successful response + await self.cmd_socket.send_pyobj(response) + self.logger.info(f"Sent response: {response}") + + # Publish status update after command completion + await self.publish_status() + + # Also publish the command result + result_info = { + 'type': 'result', + 'command': request, + 'result': response, + 'timestamp': time.time() + } + await self.pub_socket.send_pyobj(result_info) + + except Exception as e: + # Send the exception directly as the error response + self.logger.error(f"Command error: {e}") + await self.cmd_socket.send_pyobj(e) + self.logger.info(f"Sent error: {e}") + + # Publish status update and error info + await self.publish_status() + + # Also publish the command error + error_info = { + 'type': 'error', + 'command': request, + 'error': str(e), + 'timestamp': time.time() + } + await self.pub_socket.send_pyobj(error_info) + + except Exception as e: + self.logger.error(f"Fatal error handling request: {e}") + # Try to send an error response if possible + try: + await self.cmd_socket.send_pyobj(Exception(f"Fatal server error: {e}")) + except: + pass + + async def run(self) -> None: + """ + Run the server. + + This starts all the necessary tasks and keeps the server running + until shutdown is called. + """ + self.running = True + + # Set up signal handlers + for sig in (signal.SIGINT, signal.SIGTERM): + asyncio.get_event_loop().add_signal_handler(sig, lambda: asyncio.create_task(self.shutdown())) + + # Start tasks + self._tasks = [ + asyncio.create_task(self.handle_requests()), + asyncio.create_task(self.periodic_status_update()), + asyncio.create_task(self.process_data_stream()), + asyncio.create_task(self.process_data_queues()) + ] + + # Log startup + self.logger.info(f"Server started on ports: {self.cmd_port} (commands), {self.pub_port} (status), {self.data_port} (data)") + + # Wait for tasks to complete + await asyncio.gather(*self._tasks, return_exceptions=True) + + async def shutdown(self) -> None: + """Shut down the server.""" + if not self.running: + return + + self.logger.info("Shutting down server...") + self.running = False + + # Cancel all tasks + for task in self._tasks: + task.cancel() + + # No subscribers to remove + + # Shut down KTest client if it exists + if hasattr(self, '_k_test_process') and self._k_test_process: self._k_test_process.terminate() - self._k_test_process.wait() - self.socket.close() + try: + self._k_test_process.wait(timeout=5) + except subprocess.TimeoutExpired: + self._k_test_process.kill() + + # Close ZMQ sockets + for socket in (self.cmd_socket, self.pub_socket, self.data_socket): + socket.close() + + # Terminate ZMQ context self.context.term() - os._exit(0) + + # Shut down EPICS PVA + if self.pva_server: + self.pva_server.stop() + + self.logger.info("Server shutdown complete") + + +#--------------------------------------------------------------------------- +# EPICS PVA Handler +#--------------------------------------------------------------------------- + +class PVAHandler: + """Handler for EPICS PV Access.""" + + def __init__(self, server, pva_name=None): + """ + Initialize the PVA handler. + + Args: + server: The Team1kServer instance + pva_name: The PVA channel name this handler is for + """ + self.server = server + self.pva_name = pva_name + self.logger = logging.getLogger("PVAHandler") + + def put(self, pv, op): + """Called when a client tries to update a PV.""" + self.logger.warning(f"Client attempted to put to read-only PV: {op.name()}") + op.done(error="PV is read-only") + + def rpc(self, pv, op): + """Called when a client makes an RPC call.""" + self.logger.warning(f"Client attempted RPC on PV: {op.name()}") + op.done(error="RPC not supported") + + def onFirstConnect(self, pv): + """Called when the first client connects to this PV.""" + pv_name = self.pva_name or "unknown" + self.logger.info(f"First client connected to PV: {pv_name}") + + # Increment connected clients count if this is a data stream + if self.pva_name and self.pva_name in self.server.data_streams: + stream = self.server.data_streams[self.pva_name] + stream.connected_clients += 1 + + def onLastDisconnect(self, pv): + """Called when the last client disconnects from this PV.""" + pv_name = self.pva_name or "unknown" + self.logger.info(f"Last client disconnected from PV: {pv_name}") + + # Decrement connected clients count if this is a data stream + if self.pva_name and self.pva_name in self.server.data_streams: + stream = self.server.data_streams[self.pva_name] + if stream.connected_clients > 0: + stream.connected_clients -= 1#--------------------------------------------------------------------------- +# Main entry point +#--------------------------------------------------------------------------- + +def main(): + """Main entry point for the server.""" + # Configure logging + logging.basicConfig(level=logging.INFO) + + # Create and run the server + server = Team1kServer() + + try: + # Run the server + asyncio.run(server.run()) + except KeyboardInterrupt: + logging.info("Keyboard interrupt received, shutting down") + except Exception as e: + logging.error(f"Error running server: {e}") + return 1 + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) +