from typing import Tuple
from annotypes import Any, add_call_types
from malcolm.core import PartRegistrar
from malcolm.modules import builtin, scanning
from malcolm.modules.scanning.hooks import AGenerator
# How big an XML file can the EPICS waveform receive?
XML_MAX_SIZE = 1000000 - 2
# How many to load each time
POSITIONS_PER_XML = 5000
# How far to load ahead
N_LOAD_AHEAD = 4
# We will set these attributes on the child block, so don't save them
[docs]@builtin.util.no_save("xml", "enableCallbacks", "idStart", "qty", "arrayCounter")
class PositionLabellerPart(builtin.parts.ChildPart):
"""Part for controlling a `position_labeller_block` in a scan"""
# Stored generator for positions
generator: AGenerator = None
# The last index we have loaded
end_index = None
# Future for plugin run
start_future = None
# If we are currently loading then block loading more points
loading = None
# The uniqueID of the last point in the scan
id_end = 0
[docs] def setup(self, registrar: PartRegistrar) -> None:
super().setup(registrar)
# Hooks
registrar.hook(
(scanning.hooks.ConfigureHook, scanning.hooks.SeekHook,), self.on_configure,
)
registrar.hook(scanning.hooks.RunHook, self.on_run)
registrar.hook(
(scanning.hooks.AbortHook, scanning.hooks.PauseHook), self.on_abort
)
[docs] @add_call_types
def on_reset(self, context: scanning.hooks.AContext) -> None:
super().on_reset(context)
self.on_abort(context)
[docs] @add_call_types
def on_run(self, context: scanning.hooks.AContext) -> None:
self.loading = False
child = context.block_view(self.mri)
child.qty.subscribe_value(self.load_more_positions, child)
[docs] @add_call_types
def on_abort(self, context: scanning.hooks.AContext) -> None:
child = context.block_view(self.mri)
child.stop()
def load_more_positions(self, number_left: int, child: Any) -> None:
if self.end_index and self.generator.size:
if (
not self.loading
and self.end_index < self.generator.size
and number_left < POSITIONS_PER_XML * N_LOAD_AHEAD
):
self.loading = True
xml, self.end_index = self._make_xml(self.end_index)
child.xml.put_value(xml)
self.loading = False
def _make_xml(self, start_index: int) -> Tuple[str, int]:
# Make xml root
xml = '<?xml version="1.0" ?><pos_layout><dimensions>'
# Make an index for every hdf index
assert self.generator, "No generator"
for i in range(len(self.generator.dimensions)):
xml += '<dimension name="d%d" />' % i
# Add the actual positions
xml += "</dimensions><positions>"
end_index = start_index + POSITIONS_PER_XML
assert self.generator.size, "Generator is empty"
if end_index > self.generator.size:
end_index = self.generator.size
for i in range(start_index, end_index):
point = self.generator.get_point(i)
xml += "<position"
for j, value in enumerate(point.indexes):
xml += ' d%d="%s"' % (j, value)
xml += " />"
xml += "</positions></pos_layout>"
xml_length = len(xml)
assert xml_length < XML_MAX_SIZE, "XML size %d too big" % xml_length
return xml, end_index