Skip to content

Topology API

Creating, loading and running topologies

Construct a topology using this constructor

Topology

Bases: TopologyApi

Source code in hyrrokkin/api/topology.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
class Topology(TopologyApi):

    def __init__(self, topology_folder: str, package_list: list[str], temporary_folder: str | None = None,
                 engine_launcher: Union[EngineLauncher, None] = None,
                 read_only: bool = False,
                 set_engine_pid_callback: Callable[[int], None] = None,
                 import_from_path: str = None
                 ):
        """
        Create a topology instance

        Args:
            topology_folder: the folder used to store the topology's definition and files
            package_list: a list of the paths to python packages containing hyrrokkin package schemas (a schema.json file)
            temporary_folder: a folder used to store files temporarily during execution
            engine_launcher: the engine_launcher to use to run the topology in a remote process.  if not specified, select an appropriate one
                             for the packages loaded
            read_only: if true, do not allow nodes and configurations to persist data/properties changes to disk when running the topology
            set_engine_pid_callback: function that is called with the process identifier (PID) of the engine process if the engine is launched in a sub-process
            import_from_path: import from this path to initialise the topology
        """
        self.closed = False
        self.paused = True
        self.lock = threading.RLock()
        self.set_engine_pid_callback = set_engine_pid_callback
        self.topology = CoreTopology(topology_folder, package_list)
        self.topology.load_dir()
        self.schema = SchemaType(self.topology.get_schema())
        self.execution_complete_event = Event()
        self.serde = Serde(self.topology.get_schema())

        if import_from_path is not None:
            self.topology.import_from(import_from_path)

        self.temporary_folder = temporary_folder
        self.engine_launcher = engine_launcher
        self.read_only = read_only

        self.runner = None

        self.output_listeners = {}
        self.injected_input_values = {}

        self.listeners: list[TopologyListenerAPI] = []
        self.status_events = {}
        self.execution_events = {}

    def __repr__(self):
        return f"hyrrokkin.api.Topology(#nodes={len(self.get_nodes())},#links={len(self.get_links())})"

    def get_package_path(self, package_id):
        return self.topology.get_schema().get_package_path(package_id)

    def close(self):
        self.closed = True
        if self.runner is not None:
            self.runner.stop()
            self.runner.join()
            self.runner.close()

    # methods implementing the TopologyApi interface

    @check_notclosed
    @threadsafe
    def set_metadata(self, metadata: dict[str, str], ref: str | None = None):
        self.topology.set_metadata(metadata)
        for listener in self.listeners:
            listener.design_metadata_updated(metadata, ref=ref)

    @check_notclosed
    @threadsafe
    def add_node(self, node_id: str | None, node_type: str, metadata: dict[str, JsonType] = {},
                 properties: dict[str, JsonType] = {},
                 data: dict[str, bytes] = {}, copy_from_node_id: str = "", ref: str | None = None) -> str:
        node_id = self.topology.add_node(node_id, node_type, metadata, properties, data, copy_from_node_id)
        for listener in self.listeners:
            listener.node_added(node_id, node_type, metadata, ref=ref)
        return node_id

    @check_notclosed
    @threadsafe
    def update_node_metadata(self, node_id: str | None, metadata: dict[str, JsonType] = {},
                             ref: str | None = None) -> None:
        self.topology.update_node_metadata(node_id, metadata)
        for listener in self.listeners:
            listener.node_metadata_updated(node_id, metadata, ref=ref)

    @check_notclosed
    @threadsafe
    def remove_node(self, node_id: str, ref: str | None = None):
        self.topology.remove_node(node_id)
        for listener in self.listeners:
            listener.node_removed(node_id, ref=ref)

    @check_notclosed
    @threadsafe
    def add_link(self, link_id: str | None, from_node_id: str, from_port: str | None, to_node_id: str,
                 to_port: str | None, ref: str | None = None):
        link_id = self.topology.add_link(link_id, from_node_id, from_port, to_node_id, to_port)
        link_type = self.topology.get_link_type(link_id)
        for listener in self.listeners:
            listener.link_added(link_id, link_type, from_node_id, from_port, to_node_id, to_port, ref=ref)
        return link_id

    @check_notclosed
    @threadsafe
    def remove_link(self, link_id: str, ref: str | None = None):
        self.topology.remove_link(link_id)
        for listener in self.listeners:
            listener.link_removed(link_id, ref=ref)

    @check_notclosed
    @threadsafe
    def clear(self, ref: str | None = None):
        self.topology.clear()
        for listener in self.listeners:
            listener.clear(ref=ref)

    @check_notclosed
    @threadsafe
    def start(self, paused=False, ref: str | None = None):
        self.runner = self.topology.open_runner(self.temporary_folder,
                                                status_event_handler=lambda target_type, target_id, msg,
                                                                            status_code: self.status_event_handler(
                                                    target_type, target_id, msg, status_code),
                                                execution_event_handler=lambda timestamp, node_id, state, error,
                                                                               is_manual: self.execution_event_handler(
                                                    timestamp, node_id, state, error, is_manual),
                                                engine_launcher=self.engine_launcher,
                                                read_only=self.read_only,
                                                paused=True,
                                                set_engine_pid_callback=self.set_engine_pid_callback)
        self.runner.set_execution_complete_callback(lambda: self.execution_complete_handler())
        self.runner.set_request_open_client_callback(
            lambda origin_id, origin_type, session_id, client_name: self.request_open_client(origin_id, origin_type,
                                                                                             session_id, client_name))
        for (node_id, input_port_name) in self.injected_input_values:
            self.runner.inject_input_value(node_id, input_port_name, self.injected_input_values[(node_id, input_port_name)])
        for (node_id, output_port_name) in self.output_listeners:
            self.runner.add_output_listener(node_id, output_port_name, self.output_listeners[(node_id, output_port_name)])

        self.runner.start()

        for listener in self.listeners:
            listener.started(ref=ref)

        self.paused = paused

        if not paused:
            self.runner.resume()
            for listener in self.listeners:
                listener.resumed(ref=ref)

    @check_notclosed
    @threadsafe
    def is_started(self):
        return self.runner is not None

    @check_notclosed
    @threadsafe
    def pause(self, ref: str | None = None):
        if self.paused is False:
            self.paused = True
            if self.runner is not None:
                self.runner.pause()
            for listener in self.listeners:
                listener.paused(ref=ref)

    @check_notclosed
    @threadsafe
    def resume(self, ref: str | None = None):
        if self.paused is True:
            self.paused = False
            if not self.runner:
                self.start(ref)
            else:
                self.runner.resume()
            for listener in self.listeners:
                listener.resumed(ref=ref)

    @check_notclosed
    @threadsafe
    def is_paused(self):
        return self.paused

    @check_notclosed
    @threadsafe
    def restart(self, ref: str | None = None):
        for listener in self.listeners:
            listener.restarting(ref=ref)

        if self.runner:

            self.runner.restart()

            for node_id in self.get_nodes():
                timestamp = time.time()
                state = "pending"
                self.execution_events[node_id] = (timestamp, state, "", False)
                for listener in self.listeners:
                    listener.execution_event(timestamp, node_id, state, "", False)

        for listener in self.listeners:
            listener.restarted(ref=ref)

    @check_notclosed
    def run(self):
        self.resume()
        self.execution_complete_event.wait()
        self.pause()
        with self.lock:
            return self.runner.get_failures()

    @check_notclosed
    def run_task(self, task_name, input_port_values, output_ports, ref: str | None = None, decode_outputs=True):
        if self.runner is None:
            self.start(ref)
        for key in input_port_values:
            node_id = key.split(":")[0]
            input_port_name = key.split(":")[1]
            (package_id, node_type_id) = self.topology.get_node_type(node_id)
            if not isinstance(input_port_values[key],bytes):
                input_port_values[key] = self.serde.serialise(input_port_values[key], package_id, node_type_id, input_port_name)
        with self.lock:
            task_id = self.runner.submit_task(task_name, input_port_values, output_ports)
        (output_values, failures) = self.runner.wait_for_task(task_id)
        for key in output_values:
            node_id = key.split(":")[0]
            output_port_name = key.split(":")[1]
            (package_id, node_type_id) = self.topology.get_node_type(node_id)
            if decode_outputs:
                output_values[key] = self.serde.deserialise(output_values[key], package_id, node_type_id,
                                                          output_port_name)
        return (output_values, failures)

    @check_notclosed
    @threadsafe
    def reload_node(self, node_id: str, properties: JsonType, data: dict[str, bytes], ref: str | None = None):
        if self.runner is not None:
            listeners = self.listeners[:]

            def on_reload():
                for listener in listeners:
                    listener.node_reloaded(node_id, ref=ref)

            self.runner.reload_node(node_id, properties, data, on_reload=on_reload)
        else:
            self.topology.set_node_properties(node_id, properties)
            for key,value in data.items():
                self.topology.set_node_data(node_id, key, value)
            for listener in self.listeners:
                listener.node_reloaded(node_id, ref=ref)

    @check_notclosed
    @threadsafe
    def add_output_listener(self, node_id: str, output_port_name: str, listener: Callable[[Any], None], decode_outputs:bool=True):
        if not decode_outputs:
            self.output_listeners[(node_id, output_port_name)] = listener
            if self.runner is not None:
                self.runner.add_output_listener(node_id, output_port_name, listener)

        else:
            (package_id, node_type_id) = self.topology.get_node_type(node_id)

            def decoding_listener(value):
                value = self.serde.deserialise(value, package_id, node_type_id, output_port_name)
                listener(value)

            self.output_listeners[(node_id, output_port_name)] = decoding_listener

            if self.runner is not None:
                self.runner.add_output_listener(node_id, output_port_name, decoding_listener)

    @check_notclosed
    @threadsafe
    def remove_output_listener(self, node_id: str, output_port_name: str):
        del self.output_listeners[(node_id, output_port_name)]
        if self.runner is not None:
            self.runner.remove_output_listener(node_id, output_port_name)

    @check_notclosed
    @threadsafe
    def inject_input_value(self, node_id: str, input_port_name: str, value: Any):
        (package_id, node_type_id) = self.topology.get_node_type(node_id)

        if not isinstance(value, bytes):
            value = self.serde.serialise(value, package_id, node_type_id, input_port_name)

        self.injected_input_values[(node_id, input_port_name)] = value
        if self.runner is not None:
            self.runner.inject_input_value(node_id, input_port_name, value)

    @check_notclosed
    @threadsafe
    def inject_input_values(self, node_id: str, input_port_name: str, values: list[Any]):
        (package_id, node_type_id) = self.topology.get_node_type(node_id)

        values = [self.serde.serialise(value, package_id, node_type_id, input_port_name)
                  if not isinstance(value,bytes)  else value for value in values]

        self.injected_input_values[(node_id, input_port_name)] = values
        if self.runner is not None:
            self.runner.inject_input_value(node_id, input_port_name, values)

    ####################################################################################################################
    # session and client related

    @check_notclosed
    @threadsafe
    def open_session(self, session_id: str | None = None) -> str:
        if self.runner is None:
            self.start(paused=self.paused)
        return self.runner.open_session(session_id)

    @check_notclosed
    @threadsafe
    def close_session(self, session_id: str):
        if self.runner is None:
            self.start(paused=self.paused)
        self.runner.close_session(session_id)

    @check_notclosed
    @threadsafe
    def attach_node_client(self, node_id: str, session_id: str = "", client_id: str = "",
                           client_options: dict = {}) -> ClientApi:
        if self.runner is None:
            self.start(paused=self.paused)
        return self.runner.attach_node_client(node_id, session_id, client_id, client_options)

    @check_notclosed
    @threadsafe
    def attach_configuration_client(self, package_id: str, session_id: str = "", client_id: str = "",
                                    client_options: dict = {}) -> ClientApi:
        if self.runner is None:
            self.start()
        return self.runner.attach_configuration_client(package_id, session_id, client_id, client_options)

    ####################################################################################################################
    # retrieve node properties and data

    @threadsafe
    def get_node_properties(self, node_id: str) -> dict[str, JsonType]:
        return self.topology.get_node_properties(node_id)

    @threadsafe
    def get_node_data(self, node_id: str, key: str) -> bytes | None:
        return self.topology.get_node_data(node_id, key)

    @threadsafe
    def get_node_data_keys(self, node_id: str) -> list[str]:
        return self.topology.get_node_data_keys(node_id)

    ####################################################################################################################
    # load and save

    def __note_added(self, added_node_ids, added_link_ids, ref):
        for node_id in added_node_ids:
            (package_id, node_type_name) = self.topology.get_node_type(node_id)
            node_type_id = f"{package_id}:{node_type_name}"
            node_metadata = self.topology.get_node_metadata(node_id)
            for listener in self.listeners:
                listener.node_added(node_id, node_type_id, node_metadata, ref=ref)

        for link_id in added_link_ids:
            (from_node_id, from_port_name, to_node_id, to_port_name) = self.topology.get_link(link_id)
            link_type_id = self.topology.get_link_type(link_id)
            for listener in self.listeners:
                listener.link_added(link_id, link_type_id, from_node_id, from_port_name, to_node_id, to_port_name,
                                    ref=ref)

    @threadsafe
    def load(self, from_file: io.BytesIO, include_data: bool = True, ref: str | None = None) -> Tuple[
        list[str], list[str], dict[str, str]]:
        # if the runner has not been created, can load configuration
        # once the runner has been created, the configuration into an engine has been loaded and there is no real way
        # to update it
        (added_node_ids, added_link_ids, node_renamings) = self.topology.load_zip(from_file, include_data=include_data,
                                                                                  include_configuration=(self.runner is None),
                                                                                  include_metadata=(self.get_metadata()=={}))
        self.__note_added(added_node_ids, added_link_ids, ref=ref)
        return (added_node_ids, added_link_ids, node_renamings)

    @threadsafe
    def save(self, to_file: io.BufferedWriter = None, include_data: bool = True):
        return self.topology.save_zip(to_file, include_data)

    @threadsafe
    def import_from(self, from_path: str, include_data: bool = True, ref: str | None = None) -> Tuple[
        list[str], list[str], dict[str, str]]:
        # if the runner has not been created, can load configuration and topology metadata
        # once the runner has been created, the configuration into an engine has been loaded and there is no real way
        # to update it
        if from_path.endswith(".zip"):
            with open(from_path, "rb") as f:
                (added_node_ids, added_link_ids, node_renamings) = self.topology.load_zip(f, include_data=include_data,
                                                                                          include_configuration=(self.runner is None),
                                                                                          include_metadata=(self.get_metadata()=={}))
        else:
            (added_node_ids, added_link_ids, node_renamings) = self.topology.import_from(from_path, include_data,
                                                                                         include_configuration=(self.runner is None),
                                                                                         include_metadata=(self.get_metadata()=={}))

        self.__note_added(added_node_ids, added_link_ids, ref=ref)
        return (added_node_ids, added_link_ids, node_renamings)

    @threadsafe
    def export_to(self, to_path: str, include_data: bool = True):
        to_dir = os.path.split(to_path)[0]
        if to_dir:
            os.makedirs(to_dir, exist_ok=True)
        if to_path.endswith(".zip"):
            with open(to_path, "wb") as f:
                self.save(f, include_data=include_data)
        else:
            self.topology.export_to(to_path, include_data)

    @threadsafe
    def serialise(self):
        return self.topology.serialise()

    ####################################################################################################################
    # topology introspection

    @threadsafe
    def get_metadata(self) -> dict[str, JsonType]:
        return self.topology.get_metadata()

    @threadsafe
    def get_nodes(self) -> dict[str, NodeTypeApi]:
        nodes = {}
        for node_id, node in self.topology.get_nodes().items():
            node_type = node.get_node_type()
            package_id, node_type_id = node_type.split(":")
            nodes[node_id] = self.schema.get_packages()[package_id].get_node_types()[node_type_id]
        return nodes

    @threadsafe
    def get_links(self) -> dict[str, tuple[LinkTypeApi, str, str]]:
        links = {}
        for link_id, link in self.topology.get_links().items():
            link_type = link.get_link_type()
            from_port_id = link.from_node_id + ":" + link.from_port
            to_port_id = link.to_node_id + ":" + link.to_port
            links[link_id] = (link_type, from_port_id, to_port_id)
        return links

    @threadsafe
    def get_link_ids_for_node(self, node_id: str) -> list[str]:
        return self.topology.get_link_ids_for_node(node_id)

    ####################################################################################################################
    # schema introspection

    @threadsafe
    def get_schema(self) -> SchemaTypeApi:
        return self.schema

    ####################################################################################################################
    # listeners

    def attach_listener(self, listener: TopologyListenerAPI) -> None:
        self.listeners.append(listener)
        for (target_type, target_id), (msg, status_code) in self.status_events.items():
            listener.status_event(target_type, target_id, msg, status_code)
        for (node_id, (timestamp, state, error, is_manual)) in self.execution_events.items():
            listener.execution_event(node_id, timestamp, state, error, is_manual)

    def detach_listener(self, listener: TopologyListenerAPI) -> None:
        self.listeners.remove(listener)

    def status_event_handler(self, target_type: Literal["node"] | Literal["configuration"],
                             target_id: str, msg: str,
                             status_code: Literal["error"] | Literal["warning"] | Literal["info"] | Literal["log"]):
        self.status_events[(target_type, target_id)] = (msg, status_code)
        for listener in self.listeners:
            listener.status_event(target_type, target_id, msg, status_code)

    def execution_event_handler(self, timestamp: float | None,
                                node_id: str,
                                state: Literal["pending"] | Literal["running"] | Literal["completed"] | Literal[
                                    "failed"],
                                error: str | None, is_manual: bool) -> None:
        self.execution_events[node_id] = (timestamp, state, error, is_manual)
        for listener in self.listeners:
            listener.execution_event(timestamp, node_id, state, error, is_manual)

    def execution_complete_handler(self):
        for listener in self.listeners:
            listener.execution_completed()
        self.execution_complete_event.set()

    def request_open_client(self, origin_id, origin_type, session_id, client_name):
        for listener in self.listeners:
            listener.request_open_client(origin_id, origin_type, session_id, client_name)

    ####################################################################################################################
    # localisation

    @check_notclosed
    @threadsafe
    def get_type_for_node(self, node_id):
        return self.topology.get_node_type(node_id)

    @check_notclosed
    @threadsafe
    def get_localisation_bundle(self, package_id, for_language=""):
        return self.topology.get_localisation_bundle(package_id, for_language=for_language)

__init__(topology_folder, package_list, temporary_folder=None, engine_launcher=None, read_only=False, set_engine_pid_callback=None, import_from_path=None)

Create a topology instance

Parameters:

Name Type Description Default
topology_folder str

the folder used to store the topology's definition and files

required
package_list list[str]

a list of the paths to python packages containing hyrrokkin package schemas (a schema.json file)

required
temporary_folder str | None

a folder used to store files temporarily during execution

None
engine_launcher Union[EngineLauncher, None]

the engine_launcher to use to run the topology in a remote process. if not specified, select an appropriate one for the packages loaded

None
read_only bool

if true, do not allow nodes and configurations to persist data/properties changes to disk when running the topology

False
set_engine_pid_callback Callable[[int], None]

function that is called with the process identifier (PID) of the engine process if the engine is launched in a sub-process

None
import_from_path str

import from this path to initialise the topology

None
Source code in hyrrokkin/api/topology.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __init__(self, topology_folder: str, package_list: list[str], temporary_folder: str | None = None,
             engine_launcher: Union[EngineLauncher, None] = None,
             read_only: bool = False,
             set_engine_pid_callback: Callable[[int], None] = None,
             import_from_path: str = None
             ):
    """
    Create a topology instance

    Args:
        topology_folder: the folder used to store the topology's definition and files
        package_list: a list of the paths to python packages containing hyrrokkin package schemas (a schema.json file)
        temporary_folder: a folder used to store files temporarily during execution
        engine_launcher: the engine_launcher to use to run the topology in a remote process.  if not specified, select an appropriate one
                         for the packages loaded
        read_only: if true, do not allow nodes and configurations to persist data/properties changes to disk when running the topology
        set_engine_pid_callback: function that is called with the process identifier (PID) of the engine process if the engine is launched in a sub-process
        import_from_path: import from this path to initialise the topology
    """
    self.closed = False
    self.paused = True
    self.lock = threading.RLock()
    self.set_engine_pid_callback = set_engine_pid_callback
    self.topology = CoreTopology(topology_folder, package_list)
    self.topology.load_dir()
    self.schema = SchemaType(self.topology.get_schema())
    self.execution_complete_event = Event()
    self.serde = Serde(self.topology.get_schema())

    if import_from_path is not None:
        self.topology.import_from(import_from_path)

    self.temporary_folder = temporary_folder
    self.engine_launcher = engine_launcher
    self.read_only = read_only

    self.runner = None

    self.output_listeners = {}
    self.injected_input_values = {}

    self.listeners: list[TopologyListenerAPI] = []
    self.status_events = {}
    self.execution_events = {}

check_notclosed(func)

Decorator that prevents access to a method once the closed attribute is set to True :param func: the method to be decorated :return: wrapped method

Source code in hyrrokkin/api/topology.py
38
39
40
41
42
43
44
45
46
47
48
49
50
def check_notclosed(func):
    """
    Decorator that prevents access to a method once the closed attribute is set to True
    :param func: the method to be decorated
    :return: wrapped method
    """

    def notclosed_wrapper(self, *args, **kwargs):
        if self.closed:
            raise Exception("Topology is closed")
        return func(self, *args, **kwargs)

    return notclosed_wrapper

A topology inherits the following methods:

TopologyApi

Bases: ABC

Source code in hyrrokkin/interfaces/topology_api.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
class TopologyApi(ABC):

    @abstractmethod
    def set_metadata(self, metadata: dict[str, str], ref: str | None = None):
        """
        Set metadata for this topology

        Args:
            metadata: a dictionary containing metadata, consisting of string keys and values.
            ref: an optional reference that identifies this request

        Notes:
            the following keys will be understood by hyrrokkin based tools - version, description, authors
        """

    @abstractmethod
    def add_node(self, node_id: str | None, node_type: str, metadata: dict[str, JsonType] = {},
                 properties: dict[str, JsonType] = {},
                 data: dict[str, bytes] = {}, copy_from_node_id: str = "", ref: str | None = None) -> str:
        """
        Add a node to the topology

        Args:
            node_id: the node's requested unique identifier or None
            node_type: the type of the node, a string of the form package_id:node_type_id
            metadata: a dictionary containing the new metadata
            properties: dictionary containing the node's property names and values, must be JSON serialisable
            data: if set, initialise with the properties and data copied from this node
            copy_from_node_id: if set, initialise with the properties and data copied from this node rather than supplied in the data and properties arguments
            ref: an optional reference that identifies this request

        Returns:
            the id of the added node
        """

    @abstractmethod
    def update_node_metadata(self, node_id: str | None, metadata: dict[str, JsonType] = {},
                             ref: str | None = None) -> None:
        """
        Update a node's metadata

        Args:
            node_id: the node's requested unique identifier or None
            metadata: a dictionary containing the new metadata
            ref: an optional reference that identifies this request
        """

    @abstractmethod
    def remove_node(self, node_id: str, ref: str | None = None):
        """
        Remove a node from the topology

        Args:
            node_id: the node's unique identifier
            ref: an optional reference that identifies this request
        """

    @abstractmethod
    def add_link(self, link_id: str, from_node_id: str, from_port: str | None, to_node_id: str,
                 to_port: str | None, ref: str | None = None) -> str:
        """
        Add a link to the topology

        Args:
            link_id: a requested unique identifier for the link
            from_node_id: node id of the source node
            from_port: port name on the source node, can be omitted if the "from" node has only one output port
            to_node_id: node id of the destination node
            to_port: port name on the destination node, can be omitted if the "to" node has only one input port
            ref: an optional reference that identifies this request

        Raises:
            InvalidLinkError: if the link cannot be added

        Returns:
            link_id of the added link
        """

    def remove_link(self, link_id: str, ref: str | None = None):
        """
        Remove a link from the topology

        Args:
            link_id: the link's unique identifier
            ref: an optional reference that identifies this request

        """

    @abstractmethod
    def clear(self, ref: str | None = None):
        """
        Remove all nodes and links from the topology

        Args:
            ref: an optional reference that identifies this request

        """

    @abstractmethod
    def start(self, ref: str | None = None):
        """
        Start execution of the topology

        Args:
            ref: an optional reference that identifies this request
        """

    @abstractmethod
    def is_started(self):
        """
        Returns: True if the topology is executing
        """

    @abstractmethod
    def pause(self, ref: str | None = None):
        """
        Pause execution of the topology.  Until resume is called, no new nodes will start running.

        Args:
            ref: an optional reference that identifies this request
        """

    @abstractmethod
    def resume(self, ref: str | None = None):
        """
        Resume execution of the topology

        Args:
            ref: an optional reference that identifies this request
        """

    @abstractmethod
    def is_paused(self):
        """
        Returns: True if the topology is paused
        """

    @abstractmethod
    def restart(self, ref: str | None = None):
        """

        Restart execution of the topology

        Args:
            ref: an optional reference that identifies this request

        """

    @abstractmethod
    def run(self) -> dict[str, str]:
        """

        Run the topology until all nodes have completed or failed

        Returns: a dictionary containing the error messages returned from any failed nodes
        """

    @abstractmethod
    def run_task(self, task_name: str, input_port_values:dict[str,bytes|list[bytes]], output_ports:list[str], ref: str | None = None, decode_outputs:bool = True) -> tuple[
        dict[str, bytes], dict[str, str]]:
        """
        Run a topology task

        Args:
            task_name: a descriptive name of the task
            input_port_values: map from port identifier (node_id:input_port_name) to a binary-encoded input value or list of values
            output_ports: a list of port identifiers (node_id:output_port_name) specifying output ports to return values from
            ref: an optional reference that identifies this request
            decode_outputs: whether to decode output values from binary or return the binary values

        Returns:
            return a tuple of (dict mapping from output port name to value, dict mapping from node id to error string)
        """

    @abstractmethod
    def reload_node(self, node_id: str, properties: JsonType, data: dict[str, bytes], ref: str | None = None):
        """
        Reload a node with new properties and data, triggering re-execution of the node and all downstream nodes
        if the topology execution has already started

        Reloading creates a new instance of the node with the new properties and data

        Args:
            node_id: the id of the node to reload
            properties: the properties to reload
            data: the data to reload
            ref: an optional reference that identifies this request
        """

    ####################################################################################################################
    # retrieve node properties and data

    @abstractmethod
    def get_node_properties(self, node_id: str) -> dict[str, JsonType]:
        """
        Gets the properties of a node

        Args:
            node_id: the node's identifier

        Returns:
            dictionary containing properties
        """

    @abstractmethod
    def get_node_data(self, node_id: str, key: str) -> bytes | None:
        """
        Get binary data associated with this node.

        Args:
            node_id: node identifier
            key: a key to locate the data (can only contain alphanumeric characters and underscores)

        Returns:
            data or None if no data is associated with the key
        """

    @abstractmethod
    def get_node_data_keys(self, node_id: str) -> list[str]:
        """
        Get the list of keys for which the node stores binary data

        Args:
            node_id: node identifier

        Returns:
            a set of data keys
        """

    ####################################################################################################################
    # interact with the topology

    @abstractmethod
    def add_output_listener(self, node_id: str, output_port_name: str, listener: Callable[[Any], None], decode_outputs:bool = True):
        """
        Listen for values output from a node in the topology.  Replaces any existing listener on the node/port if present.

        Args:
            node_id: the node id
            output_port_name: the name of the node's output port
            listener: a callback function which is invoked with the value(s) on the output port when the node is run
            decode_outputs: if True, decode output values from binary, otherwise return the binary values
        """

    @abstractmethod
    def remove_output_listener(self, node_id: str, output_port_name: str):
        """
        Remove a listener from a node/port

        Args:
            node_id: the node id
            output_port_name: the name of the node's output port
        """

    @abstractmethod
    def inject_input_value(self, node_id: str, input_port_name: str, value: Any):
        """
        Inject input values into a node in the topology, via an input port.  The port must not be connected.

        Args:
            node_id: the node id
            input_port_name: the name of the node's input port
            value: the value to inject
        """

    @abstractmethod
    def inject_input_values(self, node_id: str, input_port_name: str, values: list[Any]):
        """
        Inject input values into a node in the topology, via an input port.  The port must not be connected and must allow multiple input connections.

        Args:
            node_id: the node id
            input_port_name: the name of the node's input port
            values: the values to inject.
        """

    ####################################################################################################################
    # session and client related

    @abstractmethod
    def open_session(self, session_id: str | None = None) -> str:
        """
        Open a new interactive session

        Args:
            session_id: the identifier of the session or None to generate a new session identifier

        Returns:
            the session identifier for the opened session
        """

    @abstractmethod
    def close_session(self, session_id: str):
        """
        Close a session

        Args:
            session_id: the identifier of the session to close
        """

    @abstractmethod
    def attach_node_client(self, node_id: str, session_id: str = "", client_id: str = "",
                           client_options: dict = {}) -> ClientApi:
        """
        Attach a client instance to a node.  Any client already attached to the node with the same client_id
        will be detached.

        Args:
            node_id: the node to which the client is to be attached
            session_id: the id of an opened interactive session
            client_id: the name of the client to attach, as defined in the node's schema
            client_options: optional, a dictionary with extra parameters from the client

        Returns:
             an instance which implements the Client API and provides methods to interact with the node

        """

    @abstractmethod
    def attach_configuration_client(self, package_id: str, session_id: str = "", client_id: str = "",
                                    client_options: dict = {}) -> ClientApi:
        """
        Attach a client instance to a package configuration

        Args:
            package_id: the package configuration to which the client is to be attached
            session_id: the id of an opened interactive session
            client_id: the id of the client to attach
            client_options: optional, a dictionary with extra parameters for the client

        Returns:
             an object which implements the Client API and provides methods to interact with the configuration
        """

    ####################################################################################################################
    # load and save

    @abstractmethod
    def load(self, from_file: io.BytesIO, include_data: bool = True, ref: str | None = None) -> Tuple[
        list[str], list[str], dict[str, str]]:
        """
        Merge the nodes and links topology from an opened ZIP file into this topology.  Node and link ids will be renamed to avoid clashes.

        Args:
            from_file: a file opened in binary mode
            include_data: whether or not to include the data from the file

        Returns:
            a tuple containing the set of added node ids, the set of added link ids, and
            a dictionary containing any node renamings performed to avoid id collisions with existing nodes
            ref: an optional reference that identifies this request

        Notes:
            configuration properties/data will not be loaded if this method is called after the topology is started
            topology metadata will not be loaded if topology metadata already exists
        """

    @abstractmethod
    def save(self, to_file: io.BufferedWriter = None, include_data: bool = True):
        """
        save the topology to a zip file

        Args:
            to_file: a file opened in binary mode for writing
            include_data: whether or not to include the data in the file
        """

    @abstractmethod
    def serialise(self):
        """
        Serialise the topology structure to JSON

        Returns: JSON-serialisable dictionary

        """

    @abstractmethod
    def import_from(self, from_path: str, include_data: bool = True, ref: str | None = None) -> Tuple[
        list[str], list[str], dict[str, str]]:
        """
        Merge the nodes and links topology from a YAML file or ZIP file into this topology.  Node and link ids will be renamed to avoid clashes.

        Args:
            from_path: the path to a YAML or ZIP file describing the topology
            include_data: whether or not to include any data referenced in the YAML file
            ref: an optional reference that identifies this request

        Returns:
            a tuple containing the set of added node ids, the set of added link ids, and
            a dictionary containing any node renamings performed to avoid id collisions with existing nodes

        Notes:
            configuration properties/data will not be loaded if this method is called after the topology is started
            topology metadata will not be loaded if topology metadata already exists
        """

    @abstractmethod
    def export_to(self, to_path: str, include_data: bool = True):
        """
        save the topology to a zip file or yaml file

        Args:
            to_path: the path to the YAML file or ZIP file to export to
            include_data: whether or not to export data as well as properties
        """

    ####################################################################################################################
    # topology introspection

    @abstractmethod
    def get_nodes(self) -> dict[str, NodeTypeApi]:
        """
        Get details of the node ids and types in the topology

        Returns:
            dict containing a mapping from node id to node type
        """

    @abstractmethod
    def get_links(self) -> dict[str, tuple[LinkTypeApi, str, str]]:
        """
        Get details of the link ids and link types in the topology

        Returns:
            dict containing a mapping from link id to a tuple of format (link_type,<node_id>:<from_port_name>,<node_id>:<to_port_name>)>
        """

    @abstractmethod
    def get_link_ids_for_node(self, node_id: str) -> list[str]:
        """
        Get the ids of all links in the topology that are connected to a node

        Args:
            node_id: the id of the node

        Returns:
            list of link ids connected to the node
        """

    ####################################################################################################################
    # schema introspection

    def get_schema(self) -> SchemaTypeApi:
        """

        Returns: an instance implementing the SchemaApi allowing the packahes, node types and link types in the schema to be examined

        """

    ####################################################################################################################
    # Attach and Detach Listeners

    @abstractmethod
    def attach_listener(self, listener: TopologyListenerAPI) -> None:
        """
        Attach a listener instance which implements the TopologyListenerAPI

        Args:
            listener: the listener instance
        """

    @abstractmethod
    def detach_listener(self, listener: TopologyListenerAPI) -> None:
        """
        Detach a listener

        Args:
            listener: the listener instance
        """

    ####################################################################################################################
    # localisation

    def get_type_for_node(self, node_id:str) -> (str,str):
        """
        Get the package_id, node_type_id for a given node

        Args:
            node_id: the identifier of the node
        Returns:
             tuple containing package id and node type id
        """

    def get_localisation_bundle(self, package_id:str, for_language:str="") -> tuple[str,dict[str,str]]:
        """
        Get a localisation bundle for a package.  If the requested language is not available returns the default language code and bundle.

        Args:
            package_id: the package identifier
            for_language: specify the language, if not specified return the default language bundle

        Returns:
            a tuple with the language code and a dictionary containing the localisation bundle
        """

Add a link to the topology

Parameters:

Name Type Description Default
link_id str

a requested unique identifier for the link

required
from_node_id str

node id of the source node

required
from_port str | None

port name on the source node, can be omitted if the "from" node has only one output port

required
to_node_id str

node id of the destination node

required
to_port str | None

port name on the destination node, can be omitted if the "to" node has only one input port

required
ref str | None

an optional reference that identifies this request

None

Raises:

Type Description
InvalidLinkError

if the link cannot be added

Returns:

Type Description
str

link_id of the added link

Source code in hyrrokkin/interfaces/topology_api.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@abstractmethod
def add_link(self, link_id: str, from_node_id: str, from_port: str | None, to_node_id: str,
             to_port: str | None, ref: str | None = None) -> str:
    """
    Add a link to the topology

    Args:
        link_id: a requested unique identifier for the link
        from_node_id: node id of the source node
        from_port: port name on the source node, can be omitted if the "from" node has only one output port
        to_node_id: node id of the destination node
        to_port: port name on the destination node, can be omitted if the "to" node has only one input port
        ref: an optional reference that identifies this request

    Raises:
        InvalidLinkError: if the link cannot be added

    Returns:
        link_id of the added link
    """

add_node(node_id, node_type, metadata={}, properties={}, data={}, copy_from_node_id='', ref=None) abstractmethod

Add a node to the topology

Parameters:

Name Type Description Default
node_id str | None

the node's requested unique identifier or None

required
node_type str

the type of the node, a string of the form package_id:node_type_id

required
metadata dict[str, JsonType]

a dictionary containing the new metadata

{}
properties dict[str, JsonType]

dictionary containing the node's property names and values, must be JSON serialisable

{}
data dict[str, bytes]

if set, initialise with the properties and data copied from this node

{}
copy_from_node_id str

if set, initialise with the properties and data copied from this node rather than supplied in the data and properties arguments

''
ref str | None

an optional reference that identifies this request

None

Returns:

Type Description
str

the id of the added node

Source code in hyrrokkin/interfaces/topology_api.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@abstractmethod
def add_node(self, node_id: str | None, node_type: str, metadata: dict[str, JsonType] = {},
             properties: dict[str, JsonType] = {},
             data: dict[str, bytes] = {}, copy_from_node_id: str = "", ref: str | None = None) -> str:
    """
    Add a node to the topology

    Args:
        node_id: the node's requested unique identifier or None
        node_type: the type of the node, a string of the form package_id:node_type_id
        metadata: a dictionary containing the new metadata
        properties: dictionary containing the node's property names and values, must be JSON serialisable
        data: if set, initialise with the properties and data copied from this node
        copy_from_node_id: if set, initialise with the properties and data copied from this node rather than supplied in the data and properties arguments
        ref: an optional reference that identifies this request

    Returns:
        the id of the added node
    """

add_output_listener(node_id, output_port_name, listener, decode_outputs=True) abstractmethod

Listen for values output from a node in the topology. Replaces any existing listener on the node/port if present.

Parameters:

Name Type Description Default
node_id str

the node id

required
output_port_name str

the name of the node's output port

required
listener Callable[[Any], None]

a callback function which is invoked with the value(s) on the output port when the node is run

required
decode_outputs bool

if True, decode output values from binary, otherwise return the binary values

True
Source code in hyrrokkin/interfaces/topology_api.py
262
263
264
265
266
267
268
269
270
271
272
@abstractmethod
def add_output_listener(self, node_id: str, output_port_name: str, listener: Callable[[Any], None], decode_outputs:bool = True):
    """
    Listen for values output from a node in the topology.  Replaces any existing listener on the node/port if present.

    Args:
        node_id: the node id
        output_port_name: the name of the node's output port
        listener: a callback function which is invoked with the value(s) on the output port when the node is run
        decode_outputs: if True, decode output values from binary, otherwise return the binary values
    """

attach_configuration_client(package_id, session_id='', client_id='', client_options={}) abstractmethod

Attach a client instance to a package configuration

Parameters:

Name Type Description Default
package_id str

the package configuration to which the client is to be attached

required
session_id str

the id of an opened interactive session

''
client_id str

the id of the client to attach

''
client_options dict

optional, a dictionary with extra parameters for the client

{}

Returns:

Type Description
ClientApi

an object which implements the Client API and provides methods to interact with the configuration

Source code in hyrrokkin/interfaces/topology_api.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
@abstractmethod
def attach_configuration_client(self, package_id: str, session_id: str = "", client_id: str = "",
                                client_options: dict = {}) -> ClientApi:
    """
    Attach a client instance to a package configuration

    Args:
        package_id: the package configuration to which the client is to be attached
        session_id: the id of an opened interactive session
        client_id: the id of the client to attach
        client_options: optional, a dictionary with extra parameters for the client

    Returns:
         an object which implements the Client API and provides methods to interact with the configuration
    """

attach_listener(listener) abstractmethod

Attach a listener instance which implements the TopologyListenerAPI

Parameters:

Name Type Description Default
listener TopologyListenerAPI

the listener instance

required
Source code in hyrrokkin/interfaces/topology_api.py
482
483
484
485
486
487
488
489
@abstractmethod
def attach_listener(self, listener: TopologyListenerAPI) -> None:
    """
    Attach a listener instance which implements the TopologyListenerAPI

    Args:
        listener: the listener instance
    """

attach_node_client(node_id, session_id='', client_id='', client_options={}) abstractmethod

Attach a client instance to a node. Any client already attached to the node with the same client_id will be detached.

Parameters:

Name Type Description Default
node_id str

the node to which the client is to be attached

required
session_id str

the id of an opened interactive session

''
client_id str

the name of the client to attach, as defined in the node's schema

''
client_options dict

optional, a dictionary with extra parameters from the client

{}

Returns:

Type Description
ClientApi

an instance which implements the Client API and provides methods to interact with the node

Source code in hyrrokkin/interfaces/topology_api.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
@abstractmethod
def attach_node_client(self, node_id: str, session_id: str = "", client_id: str = "",
                       client_options: dict = {}) -> ClientApi:
    """
    Attach a client instance to a node.  Any client already attached to the node with the same client_id
    will be detached.

    Args:
        node_id: the node to which the client is to be attached
        session_id: the id of an opened interactive session
        client_id: the name of the client to attach, as defined in the node's schema
        client_options: optional, a dictionary with extra parameters from the client

    Returns:
         an instance which implements the Client API and provides methods to interact with the node

    """

clear(ref=None) abstractmethod

Remove all nodes and links from the topology

Parameters:

Name Type Description Default
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
118
119
120
121
122
123
124
125
126
@abstractmethod
def clear(self, ref: str | None = None):
    """
    Remove all nodes and links from the topology

    Args:
        ref: an optional reference that identifies this request

    """

close_session(session_id) abstractmethod

Close a session

Parameters:

Name Type Description Default
session_id str

the identifier of the session to close

required
Source code in hyrrokkin/interfaces/topology_api.py
321
322
323
324
325
326
327
328
@abstractmethod
def close_session(self, session_id: str):
    """
    Close a session

    Args:
        session_id: the identifier of the session to close
    """

detach_listener(listener) abstractmethod

Detach a listener

Parameters:

Name Type Description Default
listener TopologyListenerAPI

the listener instance

required
Source code in hyrrokkin/interfaces/topology_api.py
491
492
493
494
495
496
497
498
@abstractmethod
def detach_listener(self, listener: TopologyListenerAPI) -> None:
    """
    Detach a listener

    Args:
        listener: the listener instance
    """

export_to(to_path, include_data=True) abstractmethod

save the topology to a zip file or yaml file

Parameters:

Name Type Description Default
to_path str

the path to the YAML file or ZIP file to export to

required
include_data bool

whether or not to export data as well as properties

True
Source code in hyrrokkin/interfaces/topology_api.py
426
427
428
429
430
431
432
433
434
@abstractmethod
def export_to(self, to_path: str, include_data: bool = True):
    """
    save the topology to a zip file or yaml file

    Args:
        to_path: the path to the YAML file or ZIP file to export to
        include_data: whether or not to export data as well as properties
    """

Get the ids of all links in the topology that are connected to a node

Parameters:

Name Type Description Default
node_id str

the id of the node

required

Returns:

Type Description
list[str]

list of link ids connected to the node

Source code in hyrrokkin/interfaces/topology_api.py
457
458
459
460
461
462
463
464
465
466
467
@abstractmethod
def get_link_ids_for_node(self, node_id: str) -> list[str]:
    """
    Get the ids of all links in the topology that are connected to a node

    Args:
        node_id: the id of the node

    Returns:
        list of link ids connected to the node
    """

Get details of the link ids and link types in the topology

Returns:

Type Description
dict[str, tuple[LinkTypeApi, str, str]]

dict containing a mapping from link id to a tuple of format (link_type,:,:)>

Source code in hyrrokkin/interfaces/topology_api.py
448
449
450
451
452
453
454
455
@abstractmethod
def get_links(self) -> dict[str, tuple[LinkTypeApi, str, str]]:
    """
    Get details of the link ids and link types in the topology

    Returns:
        dict containing a mapping from link id to a tuple of format (link_type,<node_id>:<from_port_name>,<node_id>:<to_port_name>)>
    """

get_localisation_bundle(package_id, for_language='')

Get a localisation bundle for a package. If the requested language is not available returns the default language code and bundle.

Parameters:

Name Type Description Default
package_id str

the package identifier

required
for_language str

specify the language, if not specified return the default language bundle

''

Returns:

Type Description
tuple[str, dict[str, str]]

a tuple with the language code and a dictionary containing the localisation bundle

Source code in hyrrokkin/interfaces/topology_api.py
513
514
515
516
517
518
519
520
521
522
523
def get_localisation_bundle(self, package_id:str, for_language:str="") -> tuple[str,dict[str,str]]:
    """
    Get a localisation bundle for a package.  If the requested language is not available returns the default language code and bundle.

    Args:
        package_id: the package identifier
        for_language: specify the language, if not specified return the default language bundle

    Returns:
        a tuple with the language code and a dictionary containing the localisation bundle
    """

get_node_data(node_id, key) abstractmethod

Get binary data associated with this node.

Parameters:

Name Type Description Default
node_id str

node identifier

required
key str

a key to locate the data (can only contain alphanumeric characters and underscores)

required

Returns:

Type Description
bytes | None

data or None if no data is associated with the key

Source code in hyrrokkin/interfaces/topology_api.py
234
235
236
237
238
239
240
241
242
243
244
245
@abstractmethod
def get_node_data(self, node_id: str, key: str) -> bytes | None:
    """
    Get binary data associated with this node.

    Args:
        node_id: node identifier
        key: a key to locate the data (can only contain alphanumeric characters and underscores)

    Returns:
        data or None if no data is associated with the key
    """

get_node_data_keys(node_id) abstractmethod

Get the list of keys for which the node stores binary data

Parameters:

Name Type Description Default
node_id str

node identifier

required

Returns:

Type Description
list[str]

a set of data keys

Source code in hyrrokkin/interfaces/topology_api.py
247
248
249
250
251
252
253
254
255
256
257
@abstractmethod
def get_node_data_keys(self, node_id: str) -> list[str]:
    """
    Get the list of keys for which the node stores binary data

    Args:
        node_id: node identifier

    Returns:
        a set of data keys
    """

get_node_properties(node_id) abstractmethod

Gets the properties of a node

Parameters:

Name Type Description Default
node_id str

the node's identifier

required

Returns:

Type Description
dict[str, JsonType]

dictionary containing properties

Source code in hyrrokkin/interfaces/topology_api.py
222
223
224
225
226
227
228
229
230
231
232
@abstractmethod
def get_node_properties(self, node_id: str) -> dict[str, JsonType]:
    """
    Gets the properties of a node

    Args:
        node_id: the node's identifier

    Returns:
        dictionary containing properties
    """

get_nodes() abstractmethod

Get details of the node ids and types in the topology

Returns:

Type Description
dict[str, NodeTypeApi]

dict containing a mapping from node id to node type

Source code in hyrrokkin/interfaces/topology_api.py
439
440
441
442
443
444
445
446
@abstractmethod
def get_nodes(self) -> dict[str, NodeTypeApi]:
    """
    Get details of the node ids and types in the topology

    Returns:
        dict containing a mapping from node id to node type
    """

get_schema()

Returns: an instance implementing the SchemaApi allowing the packahes, node types and link types in the schema to be examined

Source code in hyrrokkin/interfaces/topology_api.py
472
473
474
475
476
477
def get_schema(self) -> SchemaTypeApi:
    """

    Returns: an instance implementing the SchemaApi allowing the packahes, node types and link types in the schema to be examined

    """

get_type_for_node(node_id)

Get the package_id, node_type_id for a given node

Parameters:

Name Type Description Default
node_id str

the identifier of the node

required

Returns: tuple containing package id and node type id

Source code in hyrrokkin/interfaces/topology_api.py
503
504
505
506
507
508
509
510
511
def get_type_for_node(self, node_id:str) -> (str,str):
    """
    Get the package_id, node_type_id for a given node

    Args:
        node_id: the identifier of the node
    Returns:
         tuple containing package id and node type id
    """

import_from(from_path, include_data=True, ref=None) abstractmethod

Merge the nodes and links topology from a YAML file or ZIP file into this topology. Node and link ids will be renamed to avoid clashes.

Parameters:

Name Type Description Default
from_path str

the path to a YAML or ZIP file describing the topology

required
include_data bool

whether or not to include any data referenced in the YAML file

True
ref str | None

an optional reference that identifies this request

None

Returns:

Type Description
list[str]

a tuple containing the set of added node ids, the set of added link ids, and

list[str]

a dictionary containing any node renamings performed to avoid id collisions with existing nodes

Notes

configuration properties/data will not be loaded if this method is called after the topology is started topology metadata will not be loaded if topology metadata already exists

Source code in hyrrokkin/interfaces/topology_api.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
@abstractmethod
def import_from(self, from_path: str, include_data: bool = True, ref: str | None = None) -> Tuple[
    list[str], list[str], dict[str, str]]:
    """
    Merge the nodes and links topology from a YAML file or ZIP file into this topology.  Node and link ids will be renamed to avoid clashes.

    Args:
        from_path: the path to a YAML or ZIP file describing the topology
        include_data: whether or not to include any data referenced in the YAML file
        ref: an optional reference that identifies this request

    Returns:
        a tuple containing the set of added node ids, the set of added link ids, and
        a dictionary containing any node renamings performed to avoid id collisions with existing nodes

    Notes:
        configuration properties/data will not be loaded if this method is called after the topology is started
        topology metadata will not be loaded if topology metadata already exists
    """

inject_input_value(node_id, input_port_name, value) abstractmethod

Inject input values into a node in the topology, via an input port. The port must not be connected.

Parameters:

Name Type Description Default
node_id str

the node id

required
input_port_name str

the name of the node's input port

required
value Any

the value to inject

required
Source code in hyrrokkin/interfaces/topology_api.py
284
285
286
287
288
289
290
291
292
293
@abstractmethod
def inject_input_value(self, node_id: str, input_port_name: str, value: Any):
    """
    Inject input values into a node in the topology, via an input port.  The port must not be connected.

    Args:
        node_id: the node id
        input_port_name: the name of the node's input port
        value: the value to inject
    """

inject_input_values(node_id, input_port_name, values) abstractmethod

Inject input values into a node in the topology, via an input port. The port must not be connected and must allow multiple input connections.

Parameters:

Name Type Description Default
node_id str

the node id

required
input_port_name str

the name of the node's input port

required
values list[Any]

the values to inject.

required
Source code in hyrrokkin/interfaces/topology_api.py
295
296
297
298
299
300
301
302
303
304
@abstractmethod
def inject_input_values(self, node_id: str, input_port_name: str, values: list[Any]):
    """
    Inject input values into a node in the topology, via an input port.  The port must not be connected and must allow multiple input connections.

    Args:
        node_id: the node id
        input_port_name: the name of the node's input port
        values: the values to inject.
    """

is_paused() abstractmethod

Returns: True if the topology is paused

Source code in hyrrokkin/interfaces/topology_api.py
161
162
163
164
165
@abstractmethod
def is_paused(self):
    """
    Returns: True if the topology is paused
    """

is_started() abstractmethod

Returns: True if the topology is executing

Source code in hyrrokkin/interfaces/topology_api.py
137
138
139
140
141
@abstractmethod
def is_started(self):
    """
    Returns: True if the topology is executing
    """

load(from_file, include_data=True, ref=None) abstractmethod

Merge the nodes and links topology from an opened ZIP file into this topology. Node and link ids will be renamed to avoid clashes.

Parameters:

Name Type Description Default
from_file BytesIO

a file opened in binary mode

required
include_data bool

whether or not to include the data from the file

True

Returns:

Name Type Description
list[str]

a tuple containing the set of added node ids, the set of added link ids, and

list[str]

a dictionary containing any node renamings performed to avoid id collisions with existing nodes

ref dict[str, str]

an optional reference that identifies this request

Notes

configuration properties/data will not be loaded if this method is called after the topology is started topology metadata will not be loaded if topology metadata already exists

Source code in hyrrokkin/interfaces/topology_api.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
@abstractmethod
def load(self, from_file: io.BytesIO, include_data: bool = True, ref: str | None = None) -> Tuple[
    list[str], list[str], dict[str, str]]:
    """
    Merge the nodes and links topology from an opened ZIP file into this topology.  Node and link ids will be renamed to avoid clashes.

    Args:
        from_file: a file opened in binary mode
        include_data: whether or not to include the data from the file

    Returns:
        a tuple containing the set of added node ids, the set of added link ids, and
        a dictionary containing any node renamings performed to avoid id collisions with existing nodes
        ref: an optional reference that identifies this request

    Notes:
        configuration properties/data will not be loaded if this method is called after the topology is started
        topology metadata will not be loaded if topology metadata already exists
    """

open_session(session_id=None) abstractmethod

Open a new interactive session

Parameters:

Name Type Description Default
session_id str | None

the identifier of the session or None to generate a new session identifier

None

Returns:

Type Description
str

the session identifier for the opened session

Source code in hyrrokkin/interfaces/topology_api.py
309
310
311
312
313
314
315
316
317
318
319
@abstractmethod
def open_session(self, session_id: str | None = None) -> str:
    """
    Open a new interactive session

    Args:
        session_id: the identifier of the session or None to generate a new session identifier

    Returns:
        the session identifier for the opened session
    """

pause(ref=None) abstractmethod

Pause execution of the topology. Until resume is called, no new nodes will start running.

Parameters:

Name Type Description Default
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
143
144
145
146
147
148
149
150
@abstractmethod
def pause(self, ref: str | None = None):
    """
    Pause execution of the topology.  Until resume is called, no new nodes will start running.

    Args:
        ref: an optional reference that identifies this request
    """

reload_node(node_id, properties, data, ref=None) abstractmethod

Reload a node with new properties and data, triggering re-execution of the node and all downstream nodes if the topology execution has already started

Reloading creates a new instance of the node with the new properties and data

Parameters:

Name Type Description Default
node_id str

the id of the node to reload

required
properties JsonType

the properties to reload

required
data dict[str, bytes]

the data to reload

required
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
@abstractmethod
def reload_node(self, node_id: str, properties: JsonType, data: dict[str, bytes], ref: str | None = None):
    """
    Reload a node with new properties and data, triggering re-execution of the node and all downstream nodes
    if the topology execution has already started

    Reloading creates a new instance of the node with the new properties and data

    Args:
        node_id: the id of the node to reload
        properties: the properties to reload
        data: the data to reload
        ref: an optional reference that identifies this request
    """

Remove a link from the topology

Parameters:

Name Type Description Default
link_id str

the link's unique identifier

required
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
108
109
110
111
112
113
114
115
116
def remove_link(self, link_id: str, ref: str | None = None):
    """
    Remove a link from the topology

    Args:
        link_id: the link's unique identifier
        ref: an optional reference that identifies this request

    """

remove_node(node_id, ref=None) abstractmethod

Remove a node from the topology

Parameters:

Name Type Description Default
node_id str

the node's unique identifier

required
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
77
78
79
80
81
82
83
84
85
@abstractmethod
def remove_node(self, node_id: str, ref: str | None = None):
    """
    Remove a node from the topology

    Args:
        node_id: the node's unique identifier
        ref: an optional reference that identifies this request
    """

remove_output_listener(node_id, output_port_name) abstractmethod

Remove a listener from a node/port

Parameters:

Name Type Description Default
node_id str

the node id

required
output_port_name str

the name of the node's output port

required
Source code in hyrrokkin/interfaces/topology_api.py
274
275
276
277
278
279
280
281
282
@abstractmethod
def remove_output_listener(self, node_id: str, output_port_name: str):
    """
    Remove a listener from a node/port

    Args:
        node_id: the node id
        output_port_name: the name of the node's output port
    """

restart(ref=None) abstractmethod

Restart execution of the topology

Parameters:

Name Type Description Default
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
167
168
169
170
171
172
173
174
175
176
@abstractmethod
def restart(self, ref: str | None = None):
    """

    Restart execution of the topology

    Args:
        ref: an optional reference that identifies this request

    """

resume(ref=None) abstractmethod

Resume execution of the topology

Parameters:

Name Type Description Default
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
152
153
154
155
156
157
158
159
@abstractmethod
def resume(self, ref: str | None = None):
    """
    Resume execution of the topology

    Args:
        ref: an optional reference that identifies this request
    """

run() abstractmethod

Run the topology until all nodes have completed or failed

Returns: a dictionary containing the error messages returned from any failed nodes

Source code in hyrrokkin/interfaces/topology_api.py
178
179
180
181
182
183
184
185
@abstractmethod
def run(self) -> dict[str, str]:
    """

    Run the topology until all nodes have completed or failed

    Returns: a dictionary containing the error messages returned from any failed nodes
    """

run_task(task_name, input_port_values, output_ports, ref=None, decode_outputs=True) abstractmethod

Run a topology task

Parameters:

Name Type Description Default
task_name str

a descriptive name of the task

required
input_port_values dict[str, bytes | list[bytes]]

map from port identifier (node_id:input_port_name) to a binary-encoded input value or list of values

required
output_ports list[str]

a list of port identifiers (node_id:output_port_name) specifying output ports to return values from

required
ref str | None

an optional reference that identifies this request

None
decode_outputs bool

whether to decode output values from binary or return the binary values

True

Returns:

Type Description
tuple[dict[str, bytes], dict[str, str]]

return a tuple of (dict mapping from output port name to value, dict mapping from node id to error string)

Source code in hyrrokkin/interfaces/topology_api.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
@abstractmethod
def run_task(self, task_name: str, input_port_values:dict[str,bytes|list[bytes]], output_ports:list[str], ref: str | None = None, decode_outputs:bool = True) -> tuple[
    dict[str, bytes], dict[str, str]]:
    """
    Run a topology task

    Args:
        task_name: a descriptive name of the task
        input_port_values: map from port identifier (node_id:input_port_name) to a binary-encoded input value or list of values
        output_ports: a list of port identifiers (node_id:output_port_name) specifying output ports to return values from
        ref: an optional reference that identifies this request
        decode_outputs: whether to decode output values from binary or return the binary values

    Returns:
        return a tuple of (dict mapping from output port name to value, dict mapping from node id to error string)
    """

save(to_file=None, include_data=True) abstractmethod

save the topology to a zip file

Parameters:

Name Type Description Default
to_file BufferedWriter

a file opened in binary mode for writing

None
include_data bool

whether or not to include the data in the file

True
Source code in hyrrokkin/interfaces/topology_api.py
387
388
389
390
391
392
393
394
395
@abstractmethod
def save(self, to_file: io.BufferedWriter = None, include_data: bool = True):
    """
    save the topology to a zip file

    Args:
        to_file: a file opened in binary mode for writing
        include_data: whether or not to include the data in the file
    """

serialise() abstractmethod

Serialise the topology structure to JSON

Returns: JSON-serialisable dictionary

Source code in hyrrokkin/interfaces/topology_api.py
397
398
399
400
401
402
403
404
@abstractmethod
def serialise(self):
    """
    Serialise the topology structure to JSON

    Returns: JSON-serialisable dictionary

    """

set_metadata(metadata, ref=None) abstractmethod

Set metadata for this topology

Parameters:

Name Type Description Default
metadata dict[str, str]

a dictionary containing metadata, consisting of string keys and values.

required
ref str | None

an optional reference that identifies this request

None
Notes

the following keys will be understood by hyrrokkin based tools - version, description, authors

Source code in hyrrokkin/interfaces/topology_api.py
32
33
34
35
36
37
38
39
40
41
42
43
@abstractmethod
def set_metadata(self, metadata: dict[str, str], ref: str | None = None):
    """
    Set metadata for this topology

    Args:
        metadata: a dictionary containing metadata, consisting of string keys and values.
        ref: an optional reference that identifies this request

    Notes:
        the following keys will be understood by hyrrokkin based tools - version, description, authors
    """

start(ref=None) abstractmethod

Start execution of the topology

Parameters:

Name Type Description Default
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
128
129
130
131
132
133
134
135
@abstractmethod
def start(self, ref: str | None = None):
    """
    Start execution of the topology

    Args:
        ref: an optional reference that identifies this request
    """

update_node_metadata(node_id, metadata={}, ref=None) abstractmethod

Update a node's metadata

Parameters:

Name Type Description Default
node_id str | None

the node's requested unique identifier or None

required
metadata dict[str, JsonType]

a dictionary containing the new metadata

{}
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
65
66
67
68
69
70
71
72
73
74
75
@abstractmethod
def update_node_metadata(self, node_id: str | None, metadata: dict[str, JsonType] = {},
                         ref: str | None = None) -> None:
    """
    Update a node's metadata

    Args:
        node_id: the node's requested unique identifier or None
        metadata: a dictionary containing the new metadata
        ref: an optional reference that identifies this request
    """