This commit is contained in:
Sebastian Strempfer
2025-10-03 10:12:44 -07:00
parent ccd3c0a1f9
commit d7e8a093e0
10 changed files with 1905 additions and 167 deletions

View File

@@ -25,6 +25,3 @@ team1k-server = "team1k.server:main"
where = ["src"]
include = ["team1k*"]
exclude = ["tests*"]
[tool.setuptools]
packages = ["team1k"]

View File

@@ -0,0 +1,152 @@
Metadata-Version: 2.4
Name: team1k
Version: 0.0.1
Summary: Controls for the TEAM1k detector
Author-email: Sebastian Strempfer <sstrempfer@anl.gov>
Requires-Python: >=3.9
Description-Content-Type: text/markdown
Requires-Dist: numpy>=1.24.0
Requires-Dist: pyserial>=3.5
Requires-Dist: pydantic>=2.0.0
Requires-Dist: typing-extensions>=4.5.0
# Team1k: X-ray Detector Control System
This package provides control software for the Team1k X-ray detector system, including:
1. A server that manages:
- Camera bellow stage (linear motion)
- Power supply control
- Detector control via KTest commands
- Data streaming to EPICS PVA
2. Client libraries for both synchronous and asynchronous operation
## Installation
```bash
pip install -e .
```
## Server Usage
Start the Team1k server:
```bash
team1k-server
```
The server exposes:
- ZMQ REP socket on port 42003 for commands
- ZMQ PUB socket on port 42004 for status updates
- ZMQ SUB socket on port 42005 for data from NetworkInterface.cxx
- EPICS PVA for data streaming (if p4p is installed)
## Client Usage
### Basic Example
```python
from team1k import Client, ExposureModes, TriggerModes
# Create a client
client = Client('localhost')
# Control the bellow stage
client.insert_detector()
client.retract_detector()
# Control the power supply
client.power_supply_on()
voltages = client.read_voltages()
client.power_supply_off()
# Configure the detector
client.set_exposure_mode(ExposureModes.ROLLING_SHUTTER)
client.set_trigger_mode(TriggerModes.INTERNAL_TRIGGER)
client.set_integration_time(100.0) # ms
# Start/stop acquisition
client.start_daq()
client.stop_daq()
# Close the client
client.close()
```
### Async Client
```python
import asyncio
from team1k import AsyncClient
async def main():
# Create an async client
client = AsyncClient('localhost')
# Subscribe to status updates
def status_callback(status):
print(f"Status update: {status}")
await client.subscribe_to_status(status_callback)
# Get current status
status = await client.get_status()
print(f"Current status: {status}")
# Insert detector
await client.insert_detector()
# Close client
await client.close()
asyncio.run(main())
```
## Example Scripts
The `examples` directory contains sample scripts:
- `client_demo.py`: Demonstrates synchronous and asynchronous client usage
- `status_monitor.py`: Real-time monitoring of system status
## Server Commands
The server accepts the following commands:
| Command | Description |
|---------|-------------|
| `insert` | Insert the detector into the beam |
| `retract` | Retract the detector from the beam |
| `power_on` | Turn on the detector power supply |
| `power_off` | Turn off the detector power supply |
| `read_voltages` | Read the voltage values from the power supply |
| `get_status` | Get the current status of all devices |
| `subscribe PORT` | Subscribe to status updates with a callback port |
| `unsubscribe` | Unsubscribe from status updates |
| `kill` | Shut down the server |
Additionally, all commands from the KTestCommand enum are supported.
## Architecture
The system uses a distributed architecture:
1. **Team1k Server**:
- Manages all devices
- Provides command interface
- Publishes status updates
- Streams data to EPICS PVA
2. **NetworkInterface.cxx**:
- Communicates with the detector hardware
- Streams data to a ZMQ socket
3. **Team1k Client**:
- Connects to the server
- Sends commands
- Receives status updates
4. **EPICS PVA**:
- Provides standard interface for data acquisition systems
- Allows integration with other EPICS tools

View File

@@ -0,0 +1,12 @@
README.md
pyproject.toml
src/team1k/Client.py
src/team1k/KTestClient.py
src/team1k/__init__.py
src/team1k/server.py
src/team1k.egg-info/PKG-INFO
src/team1k.egg-info/SOURCES.txt
src/team1k.egg-info/dependency_links.txt
src/team1k.egg-info/entry_points.txt
src/team1k.egg-info/requires.txt
src/team1k.egg-info/top_level.txt

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,2 @@
[console_scripts]
team1k-server = team1k.server:main

View File

@@ -0,0 +1,4 @@
numpy>=1.24.0
pyserial>=3.5
pydantic>=2.0.0
typing-extensions>=4.5.0

View File

@@ -0,0 +1 @@
team1k

View File

@@ -1,64 +1,598 @@
import socket
#!/usr/bin/env python3
"""
Team1k Client - Interface to the Team1k detector control server.
"""
import uuid
import time
from KTestClient import KTestClient, KTestCommand, KTestError, ExposureModes, TriggerModes
import socket
import asyncio
import threading
from typing import Dict, List, Any, Optional, Tuple, Union, Callable
import zmq
import zmq.asyncio
from .KTestClient import KTestClient, KTestCommand, KTestError, ExposureModes, TriggerModes
class CommandException(Exception):
"""Exception raised when a command fails on the server side."""
pass
class Client:
def __init__(self, host: str = 'localhost', port: int = 42003):
self.host = host
self.port = port
self.k_test_client = KTestClient(host, port)
"""
Client for the Team1k server.
def send_command(self, command: KTestCommand, *args) -> str:
Provides methods to send commands and receive status updates.
"""
def __init__(self, host: str = 'localhost', cmd_port: int = 42003, pub_port: int = 42004):
"""
Send a command to the KTestServer and return the response.
Initialize the Team1k client.
Args:
command (KTestCommand): Command enum to send
*args: Arguments for the command
Returns:
str: Response message
host: The hostname of the server
cmd_port: The command socket port
pub_port: The status publication port
"""
return self.k_test_client.send_command(command, *args)
self.host = host
self.cmd_port = cmd_port
self.pub_port = pub_port
def reset_connection(self) -> None:
# Set up ZMQ for commands
self.context = zmq.Context()
self.cmd_socket = self.context.socket(zmq.REQ)
self.cmd_socket.connect(f"tcp://{host}:{cmd_port}")
# Status subscriber
self.sub_socket = None
self._status_thread = None
self._status_callback = None
self._status_running = False
self._last_status = None
def send_command(self, command: Union[str, KTestCommand], *args) -> Any:
"""
Send a command to the server.
Args:
command: A string command or KTestCommand enum
*args: Arguments for the command
Returns:
The server response
Raises:
CommandException: If the command fails on the server side
Exception: For other errors
"""
# Format command string
if isinstance(command, KTestCommand):
cmd_str = f"{command.value.lower()} {' '.join(str(arg) for arg in args)}"
else:
cmd_str = f"{command} {' '.join(str(arg) for arg in args)}"
# Send command
self.cmd_socket.send_string(cmd_str.strip())
# Get response
response = self.cmd_socket.recv_pyobj()
# Handle exceptions from the server
if isinstance(response, Exception):
if isinstance(response, (ValueError, RuntimeError)):
raise CommandException(str(response))
else:
raise response
return response
def subscribe_to_status(self, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
Subscribe to status updates.
Args:
callback: Function to call with status updates
"""
# Stop existing subscription if there is one
self.unsubscribe_from_status()
# Create a ZMQ SUB socket
sub_socket = self.context.socket(zmq.SUB)
sub_socket.setsockopt_string(zmq.SUBSCRIBE, '')
sub_socket.connect(f"tcp://{self.host}:{self.pub_port}")
# Set up status thread
self.sub_socket = sub_socket
self._status_callback = callback
self._status_running = True
self._status_thread = threading.Thread(target=self._status_listener, daemon=True)
self._status_thread.start()
def unsubscribe_from_status(self) -> None:
"""Stop receiving status updates."""
if not self._status_running:
return
# Stop the status thread
self._status_running = False
if self._status_thread:
self._status_thread.join(timeout=1.0)
# Close the socket
if self.sub_socket:
self.sub_socket.close()
self.sub_socket = None
def _status_listener(self) -> None:
"""Background thread to listen for status updates."""
poller = zmq.Poller()
poller.register(self.sub_socket, zmq.POLLIN)
while self._status_running:
try:
socks = dict(poller.poll(timeout=1000))
if self.sub_socket in socks:
status = self.sub_socket.recv_pyobj()
# Store status
self._last_status = status
# Call callback
if self._status_callback:
self._status_callback(status)
except Exception as e:
print(f"Error in status listener: {e}")
def get_status(self) -> Dict[str, Any]:
"""
Get the current status from the server.
Returns:
The current system status
"""
if self._last_status is not None:
return self._last_status
return self.send_command("get_status")
def insert_detector(self) -> str:
"""
Insert the detector into the beam.
Returns:
Success message
"""
return self.send_command("insert")
def retract_detector(self) -> str:
"""
Retract the detector from the beam.
Returns:
Success message
"""
return self.send_command("retract")
def power_supply_on(self) -> str:
"""
Turn on the detector power supply.
Returns:
Success message
"""
return self.send_command("power_on")
def power_supply_off(self) -> str:
"""
Turn off the detector power supply.
Returns:
Success message
"""
return self.send_command("power_off")
def read_voltages(self) -> Dict[str, List[float]]:
"""
Read the power supply voltages.
Returns:
Dictionary with voltage values
"""
return self.send_command("read_voltages")
# KTest command convenience methods
def reset_connection(self) -> Any:
"""
Reset the connection between the k_test server and the detector.
"""
response = self.send_command(KTestCommand.RESET_CONNECTION)
time.sleep(1) # Wait for the connection to stabilize
def start_daq(self) -> None:
Returns:
Server response
"""
return self.send_command(KTestCommand.RESET_CONNECTION)
def start_daq(self) -> Any:
"""
Start the data acquisition process.
"""
self.send_command(KTestCommand.RESTART_DAQ)
def stop_daq(self) -> None:
Returns:
Server response
"""
return self.send_command(KTestCommand.RESTART_DAQ)
def stop_daq(self) -> Any:
"""
Stop the data acquisition process.
"""
self.send_command(KTestCommand.STOP_DAQ)
def set_exposure_mode(self, exposure_mode: ExposureModes) -> None:
Returns:
Server response
"""
return self.send_command(KTestCommand.STOP_DAQ)
def set_exposure_mode(self, exposure_mode: ExposureModes) -> Any:
"""
Set the exposure mode for the detector.
"""
self.send_command(KTestCommand.SET_EXPOSURE_MODE, exposure_mode)
def set_trigger_mode(self, trigger_mode: TriggerModes) -> None:
Args:
exposure_mode: The exposure mode to set
Returns:
Server response
"""
return self.send_command(KTestCommand.SET_EXPOSURE_MODE, exposure_mode)
def set_trigger_mode(self, trigger_mode: TriggerModes) -> Any:
"""
Set the trigger mode for the detector.
"""
self.send_command(KTestCommand.SET_TRIGGER_MODE, trigger_mode)
def set_integration_time(self, integration_time_ms: float) -> None:
Args:
trigger_mode: The trigger mode to set
Returns:
Server response
"""
return self.send_command(KTestCommand.SET_TRIGGER_MODE, trigger_mode)
def set_integration_time(self, integration_time_ms: float) -> Any:
"""
Set the integration time in milliseconds.
"""
self.send_command(KTestCommand.SET_INTEGRATION_TIME, integration_time_ms)
def load_parameter_file(self, file_path: str) -> None:
Args:
integration_time_ms: The integration time in milliseconds
Returns:
Server response
"""
return self.send_command(KTestCommand.SET_INTEGRATION_TIME, integration_time_ms)
def load_parameter_file(self, file_path: str) -> Any:
"""
Load a parameter file for the detector.
Args:
file_path: Path to the parameter file
Returns:
Server response
"""
self.send_command(KTestCommand.LOAD_PARAMETER_FILE, file_path)
return self.send_command(KTestCommand.LOAD_PARAMETER_FILE, file_path)
def close(self) -> None:
"""Close the client connections."""
# Unsubscribe if needed
self.unsubscribe_from_status()
# Close command socket
if self.cmd_socket:
self.cmd_socket.close()
# Terminate context
if self.context:
self.context.term()
class AsyncClient:
"""
Asynchronous client for the Team1k server.
Provides async methods to send commands and receive status updates.
"""
def __init__(self, host: str = 'localhost', cmd_port: int = 42003, pub_port: int = 42004):
"""
Initialize the asynchronous Team1k client.
Args:
host: The hostname of the server
cmd_port: The command socket port
pub_port: The status publication port
"""
self.host = host
self.cmd_port = cmd_port
self.pub_port = pub_port
# Set up ZMQ for commands
self.context = zmq.asyncio.Context()
self.cmd_socket = self.context.socket(zmq.REQ)
self.cmd_socket.connect(f"tcp://{host}:{cmd_port}")
# Status subscriber
self.sub_socket = None
self._status_task = None
self._status_callbacks = []
self._status_running = False
self._last_status = None
async def send_command(self, command: Union[str, KTestCommand], *args) -> Any:
"""
Send a command to the server asynchronously.
Args:
command: A string command or KTestCommand enum
*args: Arguments for the command
Returns:
The server response
Raises:
CommandException: If the command fails on the server side
Exception: For other errors
"""
# Format command string
if isinstance(command, KTestCommand):
cmd_str = f"{command.value.lower()} {' '.join(str(arg) for arg in args)}"
else:
cmd_str = f"{command} {' '.join(str(arg) for arg in args)}"
# Send command
await self.cmd_socket.send_string(cmd_str.strip())
# Get response
response = await self.cmd_socket.recv_pyobj()
# Handle exceptions from the server
if isinstance(response, Exception):
if isinstance(response, (ValueError, RuntimeError)):
raise CommandException(str(response))
else:
raise response
return response
async def subscribe_to_status(self, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
Subscribe to status updates asynchronously.
Args:
callback: Function to call with status updates
"""
# Add callback if we're already subscribed
if self._status_running and self.sub_socket:
self._status_callbacks.append(callback)
return
# Create a ZMQ SUB socket
sub_socket = self.context.socket(zmq.SUB)
sub_socket.setsockopt_string(zmq.SUBSCRIBE, '')
sub_socket.connect(f"tcp://{self.host}:{self.pub_port}")
# Set up status task
self.sub_socket = sub_socket
self._status_callbacks = [callback]
self._status_running = True
self._status_task = asyncio.create_task(self._status_listener())
async def unsubscribe_from_status(self) -> None:
"""Stop receiving status updates asynchronously."""
if not self._status_running:
return
# Stop the status task
self._status_running = False
if self._status_task:
self._status_task.cancel()
try:
await self._status_task
except asyncio.CancelledError:
pass
# Close the socket
if self.sub_socket:
self.sub_socket.close()
self.sub_socket = None
async def _status_listener(self) -> None:
"""Background task to listen for status updates."""
while self._status_running:
try:
# Receive message
status = await self.sub_socket.recv_pyobj()
# Store status
self._last_status = status
# Call callbacks
for callback in self._status_callbacks:
callback(status)
except asyncio.CancelledError:
# Task was cancelled
break
except Exception as e:
print(f"Error in status listener: {e}")
async def get_status(self) -> Dict[str, Any]:
"""
Get the current status from the server asynchronously.
Returns:
The current system status
"""
if self._last_status is not None:
return self._last_status
return await self.send_command("get_status")
async def insert_detector(self) -> str:
"""
Insert the detector into the beam asynchronously.
Returns:
Success message
"""
return await self.send_command("insert")
async def retract_detector(self) -> str:
"""
Retract the detector from the beam asynchronously.
Returns:
Success message
"""
return await self.send_command("retract")
async def power_supply_on(self) -> str:
"""
Turn on the detector power supply asynchronously.
Returns:
Success message
"""
return await self.send_command("power_on")
async def power_supply_off(self) -> str:
"""
Turn off the detector power supply asynchronously.
Returns:
Success message
"""
return await self.send_command("power_off")
async def read_voltages(self) -> Dict[str, List[float]]:
"""
Read the power supply voltages asynchronously.
Returns:
Dictionary with voltage values
"""
return await self.send_command("read_voltages")
# KTest command convenience methods
async def reset_connection(self) -> Any:
"""
Reset the connection between the k_test server and the detector asynchronously.
Returns:
Server response
"""
return await self.send_command(KTestCommand.RESET_CONNECTION)
async def start_daq(self) -> Any:
"""
Start the data acquisition process asynchronously.
Returns:
Server response
"""
return await self.send_command(KTestCommand.RESTART_DAQ)
async def stop_daq(self) -> Any:
"""
Stop the data acquisition process asynchronously.
Returns:
Server response
"""
return await self.send_command(KTestCommand.STOP_DAQ)
async def set_exposure_mode(self, exposure_mode: ExposureModes) -> Any:
"""
Set the exposure mode for the detector asynchronously.
Args:
exposure_mode: The exposure mode to set
Returns:
Server response
"""
return await self.send_command(KTestCommand.SET_EXPOSURE_MODE, exposure_mode)
async def set_trigger_mode(self, trigger_mode: TriggerModes) -> Any:
"""
Set the trigger mode for the detector asynchronously.
Args:
trigger_mode: The trigger mode to set
Returns:
Server response
"""
return await self.send_command(KTestCommand.SET_TRIGGER_MODE, trigger_mode)
async def set_integration_time(self, integration_time_ms: float) -> Any:
"""
Set the integration time in milliseconds asynchronously.
Args:
integration_time_ms: The integration time in milliseconds
Returns:
Server response
"""
return await self.send_command(KTestCommand.SET_INTEGRATION_TIME, integration_time_ms)
async def load_parameter_file(self, file_path: str) -> Any:
"""
Load a parameter file for the detector asynchronously.
Args:
file_path: Path to the parameter file
Returns:
Server response
"""
return await self.send_command(KTestCommand.LOAD_PARAMETER_FILE, file_path)
async def close(self) -> None:
"""Close the client connections asynchronously."""
# Unsubscribe if needed
await self.unsubscribe_from_status()
# Close command socket
if self.cmd_socket:
self.cmd_socket.close()
# Terminate context
if self.context:
self.context.term()
# Example usage
if __name__ == "__main__":
# Synchronous example
client = Client()
print("Synchronous client:")
try:
print("Status:", client.get_status())
print("Voltages:", client.read_voltages())
except CommandException as e:
print(f"Command failed: {e}")
client.close()
# Asynchronous example
async def main():
client = AsyncClient()
print("\nAsynchronous client:")
try:
status = await client.get_status()
print("Status:", status)
voltages = await client.read_voltages()
print("Voltages:", voltages)
except CommandException as e:
print(f"Command failed: {e}")
await client.close()
asyncio.run(main())

View File

@@ -1,2 +1,8 @@
from .Client import Client, KTestCommand, KTestError, ExposureModes, TriggerModes
__all__ = ['Client', 'KTestCommand', 'KTestError', 'ExposureModes', 'TriggerModes']
from .Client import Client, AsyncClient
from .KTestClient import KTestCommand, KTestError, ExposureModes, TriggerModes
__all__ = [
'Client', 'AsyncClient',
'KTestCommand', 'KTestError',
'ExposureModes', 'TriggerModes'
]

File diff suppressed because it is too large Load Diff