Skip to content

Topology API

Creating, loading and running topologies

Construct a topology using this constructor

Topology

Bases: TopologyApi

Source code in hyrrokkin/api/topology.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
class Topology(TopologyApi):

    def __init__(self, topology_folder: str, package_list: list[str], temporary_folder: str | None = None,
                 engine_launcher: Union[EngineLauncher, None] = None,
                 read_only: bool = False,
                 set_engine_pid_callback: Callable[[int], None] = None,
                 import_from_path: str = None
                 ):
        """
        Create a topology instance

        Args:
            topology_folder: the folder used to store the topology's definition and files
            package_list: a list of the paths to python packages containing hyrrokkin package schemas (a schema.json file)
            temporary_folder: a folder used to store files temporarily during execution
            engine_launcher: the engine_launcher to use to run the topology in a remote process.  if not specified, select an appropriate one
                             for the packages loaded
            read_only: if true, do not allow nodes and configurations to persist data/properties changes to disk when running the topology
            set_engine_pid_callback: function that is called with the process identifier (PID) of the engine process if the engine is launched in a sub-process
            import_from_path: import from this path to initialise the topology
        """
        self.closed = False
        self.paused = True
        self.lock = threading.RLock()
        self.set_engine_pid_callback = set_engine_pid_callback
        self.topology = CoreTopology(topology_folder, package_list)
        self.topology.load_dir()
        self.schema = SchemaType(self.topology.get_schema())
        self.execution_complete_event = Event()

        if import_from_path is not None:
            self.topology.import_from(import_from_path)

        self.temporary_folder = temporary_folder
        self.engine_launcher = engine_launcher
        self.read_only = read_only

        self.runner = None

        self.output_listeners = {}
        self.injected_input_values = {}

        self.listeners: list[TopologyListenerAPI] = []
        self.status_events = {}
        self.execution_events = {}

    def __repr__(self):
        return f"hyrrokkin.api.Topology(#nodes={len(self.get_nodes())},#links={len(self.get_links())})"

    def get_package_path(self, package_id):
        return self.topology.get_schema().get_package_path(package_id)

    def close(self):
        self.closed = True
        if self.runner is not None:
            self.runner.stop()
            self.runner.join()
            self.runner.close()

    # methods implementing the TopologyApi interface

    @check_notclosed
    @threadsafe
    def set_metadata(self, metadata: dict[str, str], ref: str | None = None):
        self.topology.set_metadata(metadata)
        for listener in self.listeners:
            listener.design_metadata_updated(metadata, ref=ref)

    @check_notclosed
    @threadsafe
    def add_node(self, node_id: str | None, node_type: str, metadata: dict[str, JsonType] = {},
                 properties: dict[str, JsonType] = {},
                 data: dict[str, bytes] = {}, copy_from_node_id: str = "", ref: str | None = None) -> str:
        node_id = self.topology.add_node(node_id, node_type, metadata, properties, data, copy_from_node_id)
        for listener in self.listeners:
            listener.node_added(node_id, node_type, metadata, ref=ref)
        return node_id

    @check_notclosed
    @threadsafe
    def update_node_metadata(self, node_id: str | None, metadata: dict[str, JsonType] = {},
                             ref: str | None = None) -> None:
        self.topology.update_node_metadata(node_id, metadata)
        for listener in self.listeners:
            listener.node_metadata_updated(node_id, metadata, ref=ref)

    @check_notclosed
    @threadsafe
    def remove_node(self, node_id: str, ref: str | None = None):
        self.topology.remove_node(node_id)
        for listener in self.listeners:
            listener.node_removed(node_id, ref=ref)

    @check_notclosed
    @threadsafe
    def add_link(self, link_id: str | None, from_node_id: str, from_port: str | None, to_node_id: str,
                 to_port: str | None, ref: str | None = None):
        link_id = self.topology.add_link(link_id, from_node_id, from_port, to_node_id, to_port)
        link_type = self.topology.get_link_type(link_id)
        for listener in self.listeners:
            listener.link_added(link_id, link_type, from_node_id, from_port, to_node_id, to_port, ref=ref)
        return link_id

    @check_notclosed
    @threadsafe
    def remove_link(self, link_id: str, ref: str | None = None):
        self.topology.remove_link(link_id)
        for listener in self.listeners:
            listener.link_removed(link_id, ref=ref)

    @check_notclosed
    @threadsafe
    def clear(self, ref: str | None = None):
        self.topology.clear()
        for listener in self.listeners:
            listener.clear(ref=ref)

    @check_notclosed
    @threadsafe
    def start(self, paused=False, ref: str | None = None):
        self.runner = self.topology.open_runner(self.temporary_folder,
                                                status_event_handler=lambda target_type, target_id, msg,
                                                                            status_code: self.status_event_handler(
                                                    target_type, target_id, msg, status_code),
                                                execution_event_handler=lambda timestamp, node_id, state, error,
                                                                               is_manual: self.execution_event_handler(
                                                    timestamp, node_id, state, error, is_manual),
                                                engine_launcher=self.engine_launcher,
                                                read_only=self.read_only,
                                                paused=True,
                                                set_engine_pid_callback=self.set_engine_pid_callback)
        self.runner.set_execution_complete_callback(lambda: self.execution_complete_handler())
        self.runner.set_request_open_client_callback(
            lambda origin_id, origin_type, session_id, client_name: self.request_open_client(origin_id, origin_type,
                                                                                             session_id, client_name))
        for (node_id, input_port_name) in self.injected_input_values:
            self.runner.inject_input_value(node_id, input_port_name, self.injected_input_values[(node_id, input_port_name)])
        for (node_id, output_port_name) in self.output_listeners:
            self.runner.add_output_listener(node_id, output_port_name, self.output_listeners[(node_id, output_port_name)])

        self.runner.start()

        for listener in self.listeners:
            listener.started(ref=ref)

        self.paused = paused

        if not paused:
            self.runner.resume()
            for listener in self.listeners:
                listener.resumed(ref=ref)

    @check_notclosed
    @threadsafe
    def is_started(self):
        return self.runner is not None

    @check_notclosed
    @threadsafe
    def pause(self, ref: str | None = None):
        if self.paused is False:
            self.paused = True
            if self.runner is not None:
                self.runner.pause()
            for listener in self.listeners:
                listener.paused(ref=ref)

    @check_notclosed
    @threadsafe
    def resume(self, ref: str | None = None):
        if self.paused is True:
            self.paused = False
            if not self.runner:
                self.start(ref)
            else:
                self.runner.resume()
            for listener in self.listeners:
                listener.resumed(ref=ref)

    @check_notclosed
    @threadsafe
    def is_paused(self):
        return self.paused

    @check_notclosed
    @threadsafe
    def restart(self, ref: str | None = None):
        if not self.runner:
            self.runner.start(ref)
        else:
            self.runner.restart(ref=ref)
            for listener in self.listeners:
                listener.restarted(ref=ref)

    @check_notclosed
    def run(self):
        self.resume()
        self.execution_complete_event.wait()
        self.pause()
        with self.lock:
            return self.runner.get_failures()

    @check_notclosed
    def run_task(self, task_name, input_port_values, output_ports, ref: str | None = None):
        if self.runner is None:
            self.start(ref)
        with self.lock:
            task_id = self.runner.submit_task(task_name, input_port_values, output_ports)
        return self.runner.wait_for_task(task_id)

    @check_notclosed
    @threadsafe
    def reload_node(self, node_id: str, properties: JsonType, data: dict[str, bytes], ref: str | None = None):
        if self.runner is not None:
            listeners = self.listeners[:]

            def on_reload():
                for listener in listeners:
                    listener.node_reloaded(node_id, ref=ref)

            self.runner.reload_node(node_id, properties, data, on_reload=on_reload)
        else:
            self.topology.set_node_properties(node_id, properties)
            for key,value in data.items():
                self.topology.set_node_data(node_id, key, value)
            for listener in self.listeners:
                listener.node_reloaded(node_id, ref=ref)

    @check_notclosed
    @threadsafe
    def add_output_listener(self, node_id: str, output_port_name: str, listener: Callable[[bytes], None]):
        self.output_listeners[(node_id, output_port_name)] = listener
        if self.runner is not None:
            self.runner.add_output_listener(node_id, output_port_name, listener)

    @check_notclosed
    @threadsafe
    def remove_output_listener(self, node_id: str, output_port_name: str):
        del self.output_listeners[(node_id, output_port_name)]
        if self.runner is not None:
            self.runner.remove_output_listener(node_id, output_port_name)

    @check_notclosed
    @threadsafe
    def inject_input_value(self, node_id: str, input_port_name: str, value: bytes | list[bytes]):
        self.injected_input_values[(node_id, input_port_name)] = value
        if self.runner is not None:
            self.runner.inject_input_value(node_id, input_port_name, value)

    ####################################################################################################################
    # session and client related

    @check_notclosed
    @threadsafe
    def open_session(self, session_id: str | None = None) -> str:
        if self.runner is None:
            self.start(paused=self.paused)
        return self.runner.open_session(session_id)

    @check_notclosed
    @threadsafe
    def close_session(self, session_id: str):
        if self.runner is None:
            self.start(paused=self.paused)
        self.runner.close_session(session_id)

    @check_notclosed
    @threadsafe
    def attach_node_client(self, node_id: str, session_id: str = "", client_id: str = "",
                           client_options: dict = {}) -> ClientApi:
        if self.runner is None:
            self.start(paused=self.paused)
        return self.runner.attach_node_client(node_id, session_id, client_id, client_options)

    @check_notclosed
    @threadsafe
    def attach_configuration_client(self, package_id: str, session_id: str = "", client_id: str = "",
                                    client_options: dict = {}) -> ClientApi:
        if self.runner is None:
            self.start()
        return self.runner.attach_configuration_client(package_id, session_id, client_id, client_options)

    ####################################################################################################################
    # retrieve node properties and data

    @threadsafe
    def get_node_properties(self, node_id: str) -> dict[str, JsonType]:
        return self.topology.get_node_properties(node_id)

    @threadsafe
    def get_node_data(self, node_id: str, key: str) -> bytes | None:
        return self.topology.get_node_data(node_id, key)

    @threadsafe
    def get_node_data_keys(self, node_id: str) -> list[str]:
        return self.topology.get_node_data_keys(node_id)

    ####################################################################################################################
    # load and save

    def __note_added(self, added_node_ids, added_link_ids, ref):
        for node_id in added_node_ids:
            (package_id, node_type_name) = self.topology.get_node_type(node_id)
            node_type_id = f"{package_id}:{node_type_name}"
            node_metadata = self.topology.get_node_metadata(node_id)
            for listener in self.listeners:
                listener.node_added(node_id, node_type_id, node_metadata, ref=ref)

        for link_id in added_link_ids:
            (from_node_id, from_port_name, to_node_id, to_port_name) = self.topology.get_link(link_id)
            link_type_id = self.topology.get_link_type(link_id)
            for listener in self.listeners:
                listener.link_added(link_id, link_type_id, from_node_id, from_port_name, to_node_id, to_port_name,
                                    ref=ref)

    @threadsafe
    def load(self, from_file: io.BytesIO, include_data: bool = True, ref: str | None = None) -> Tuple[
        list[str], list[str], dict[str, str]]:
        # if the runner has not been created, can load configuration
        # once the runner has been created, the configuration into an engine has been loaded and there is no real way
        # to update it
        (added_node_ids, added_link_ids, node_renamings) = self.topology.load_zip(from_file, include_data=include_data,
                                                                                  include_configuration=(self.runner is None),
                                                                                  include_metadata=(self.get_metadata()=={}))
        self.__note_added(added_node_ids, added_link_ids, ref=ref)
        return (added_node_ids, added_link_ids, node_renamings)

    @threadsafe
    def save(self, to_file: io.BufferedWriter = None, include_data: bool = True):
        return self.topology.save_zip(to_file, include_data)

    @threadsafe
    def import_from(self, from_path: str, include_data: bool = True, ref: str | None = None) -> Tuple[
        list[str], list[str], dict[str, str]]:
        # if the runner has not been created, can load configuration and topology metadata
        # once the runner has been created, the configuration into an engine has been loaded and there is no real way
        # to update it
        if from_path.endswith(".zip"):
            with open(from_path, "rb") as f:
                (added_node_ids, added_link_ids, node_renamings) = self.topology.load_zip(f, include_data=include_data,
                                                                                          include_configuration=(self.runner is None),
                                                                                          include_metadata=(self.get_metadata()=={}))
        else:
            (added_node_ids, added_link_ids, node_renamings) = self.topology.import_from(from_path, include_data,
                                                                                         include_configuration=(self.runner is None),
                                                                                         include_metadata=(self.get_metadata()=={}))

        self.__note_added(added_node_ids, added_link_ids, ref=ref)
        return (added_node_ids, added_link_ids, node_renamings)

    @threadsafe
    def export_to(self, to_path: str, include_data: bool = True):
        to_dir = os.path.split(to_path)[0]
        if to_dir:
            os.makedirs(to_dir, exist_ok=True)
        if to_path.endswith(".zip"):
            with open(to_path, "wb") as f:
                self.save(f, include_data=include_data)
        else:
            self.topology.export_to(to_path, include_data)

    @threadsafe
    def serialise(self):
        return self.topology.serialise()

    ####################################################################################################################
    # topology introspection

    @threadsafe
    def get_metadata(self) -> dict[str, JsonType]:
        return self.topology.get_metadata()

    @threadsafe
    def get_nodes(self) -> dict[str, NodeTypeApi]:
        nodes = {}
        for node_id, node in self.topology.get_nodes().items():
            node_type = node.get_node_type()
            package_id, node_type_id = node_type.split(":")
            nodes[node_id] = self.schema.get_packages()[package_id].get_node_types()[node_type_id]
        return nodes

    @threadsafe
    def get_links(self) -> dict[str, tuple[LinkTypeApi, str, str]]:
        links = {}
        for link_id, link in self.topology.get_links().items():
            link_type = link.get_link_type()
            from_port_id = link.from_node_id + ":" + link.from_port
            to_port_id = link.to_node_id + ":" + link.to_port
            links[link_id] = (link_type, from_port_id, to_port_id)
        return links

    @threadsafe
    def get_link_ids_for_node(self, node_id: str) -> list[str]:
        return self.topology.get_link_ids_for_node(node_id)

    ####################################################################################################################
    # schema introspection

    @threadsafe
    def get_schema(self) -> SchemaTypeApi:
        return self.schema

    ####################################################################################################################
    # listeners

    def attach_listener(self, listener: TopologyListenerAPI) -> None:
        self.listeners.append(listener)
        for (target_type, target_id), (msg, status_code) in self.status_events.items():
            listener.status_event(target_type, target_id, msg, status_code)
        for (node_id, (timestamp, state, error, is_manual)) in self.execution_events.items():
            listener.execution_event(node_id, timestamp, state, error, is_manual)

    def detach_listener(self, listener: TopologyListenerAPI) -> None:
        self.listeners.remove(listener)

    def status_event_handler(self, target_type: Literal["node"] | Literal["configuration"],
                             target_id: str, msg: str,
                             status_code: Literal["error"] | Literal["warning"] | Literal["info"] | Literal["log"]):
        self.status_events[(target_type, target_id)] = (msg, status_code)
        for listener in self.listeners:
            listener.status_event(target_type, target_id, msg, status_code)

    def execution_event_handler(self, timestamp: float | None,
                                node_id: str,
                                state: Literal["pending"] | Literal["running"] | Literal["completed"] | Literal[
                                    "failed"],
                                error: str | None, is_manual: bool) -> None:
        self.execution_events[node_id] = (timestamp, state, error, is_manual)
        for listener in self.listeners:
            listener.execution_event(timestamp, node_id, state, error, is_manual)

    def execution_complete_handler(self):
        for listener in self.listeners:
            listener.execution_completed()
        self.execution_complete_event.set()

    def request_open_client(self, origin_id, origin_type, session_id, client_name):
        for listener in self.listeners:
            listener.request_open_client(origin_id, origin_type, session_id, client_name)

__init__(topology_folder, package_list, temporary_folder=None, engine_launcher=None, read_only=False, set_engine_pid_callback=None, import_from_path=None)

Create a topology instance

Parameters:

Name Type Description Default
topology_folder str

the folder used to store the topology's definition and files

required
package_list list[str]

a list of the paths to python packages containing hyrrokkin package schemas (a schema.json file)

required
temporary_folder str | None

a folder used to store files temporarily during execution

None
engine_launcher Union[EngineLauncher, None]

the engine_launcher to use to run the topology in a remote process. if not specified, select an appropriate one for the packages loaded

None
read_only bool

if true, do not allow nodes and configurations to persist data/properties changes to disk when running the topology

False
set_engine_pid_callback Callable[[int], None]

function that is called with the process identifier (PID) of the engine process if the engine is launched in a sub-process

None
import_from_path str

import from this path to initialise the topology

None
Source code in hyrrokkin/api/topology.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def __init__(self, topology_folder: str, package_list: list[str], temporary_folder: str | None = None,
             engine_launcher: Union[EngineLauncher, None] = None,
             read_only: bool = False,
             set_engine_pid_callback: Callable[[int], None] = None,
             import_from_path: str = None
             ):
    """
    Create a topology instance

    Args:
        topology_folder: the folder used to store the topology's definition and files
        package_list: a list of the paths to python packages containing hyrrokkin package schemas (a schema.json file)
        temporary_folder: a folder used to store files temporarily during execution
        engine_launcher: the engine_launcher to use to run the topology in a remote process.  if not specified, select an appropriate one
                         for the packages loaded
        read_only: if true, do not allow nodes and configurations to persist data/properties changes to disk when running the topology
        set_engine_pid_callback: function that is called with the process identifier (PID) of the engine process if the engine is launched in a sub-process
        import_from_path: import from this path to initialise the topology
    """
    self.closed = False
    self.paused = True
    self.lock = threading.RLock()
    self.set_engine_pid_callback = set_engine_pid_callback
    self.topology = CoreTopology(topology_folder, package_list)
    self.topology.load_dir()
    self.schema = SchemaType(self.topology.get_schema())
    self.execution_complete_event = Event()

    if import_from_path is not None:
        self.topology.import_from(import_from_path)

    self.temporary_folder = temporary_folder
    self.engine_launcher = engine_launcher
    self.read_only = read_only

    self.runner = None

    self.output_listeners = {}
    self.injected_input_values = {}

    self.listeners: list[TopologyListenerAPI] = []
    self.status_events = {}
    self.execution_events = {}

check_notclosed(func)

Decorator that prevents access to a method once the closed attribute is set to True :param func: the method to be decorated :return: wrapped method

Source code in hyrrokkin/api/topology.py
35
36
37
38
39
40
41
42
43
44
45
46
47
def check_notclosed(func):
    """
    Decorator that prevents access to a method once the closed attribute is set to True
    :param func: the method to be decorated
    :return: wrapped method
    """

    def notclosed_wrapper(self, *args, **kwargs):
        if self.closed:
            raise Exception("Topology is closed")
        return func(self, *args, **kwargs)

    return notclosed_wrapper

A topology inherits the following methods:

TopologyApi

Bases: ABC

Source code in hyrrokkin/interfaces/topology_api.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
class TopologyApi(ABC):

    @abstractmethod
    def set_metadata(self, metadata: dict[str, str], ref: str | None = None):
        """
        Set metadata for this topology

        Args:
            metadata: a dictionary containing metadata, consisting of string keys and values.
            ref: an optional reference that identifies this request

        Notes:
            the following keys will be understood by hyrrokkin based tools - version, description, authors
        """

    @abstractmethod
    def add_node(self, node_id: str | None, node_type: str, metadata: dict[str, JsonType] = {},
                 properties: dict[str, JsonType] = {},
                 data: dict[str, bytes] = {}, copy_from_node_id: str = "", ref: str | None = None) -> str:
        """
        Add a node to the topology

        Args:
            node_id: the node's requested unique identifier or None
            node_type: the type of the node, a string of the form package_id:node_type_id
            metadata: a dictionary containing the new metadata
            properties: dictionary containing the node's property names and values, must be JSON serialisable
            data: if set, initialise with the properties and data copied from this node
            copy_from_node_id: if set, initialise with the properties and data copied from this node rather than supplied in the data and properties arguments
            ref: an optional reference that identifies this request

        Returns:
            the id of the added node
        """

    @abstractmethod
    def update_node_metadata(self, node_id: str | None, metadata: dict[str, JsonType] = {},
                             ref: str | None = None) -> None:
        """
        Update a node's metadata

        Args:
            node_id: the node's requested unique identifier or None
            metadata: a dictionary containing the new metadata
            ref: an optional reference that identifies this request
        """

    @abstractmethod
    def remove_node(self, node_id: str, ref: str | None = None):
        """
        Remove a node from the topology

        Args:
            node_id: the node's unique identifier
            ref: an optional reference that identifies this request
        """

    @abstractmethod
    def add_link(self, link_id: str, from_node_id: str, from_port: str | None, to_node_id: str,
                 to_port: str | None, ref: str | None = None) -> str:
        """
        Add a link to the topology

        Args:
            link_id: a requested unique identifier for the link
            from_node_id: node id of the source node
            from_port: port name on the source node, can be omitted if the "from" node has only one output port
            to_node_id: node id of the destination node
            to_port: port name on the destination node, can be omitted if the "to" node has only one input port
            ref: an optional reference that identifies this request

        Raises:
            InvalidLinkError: if the link cannot be added

        Returns:
            link_id of the added link
        """

    def remove_link(self, link_id: str, ref: str | None = None):
        """
        Remove a link from the topology

        Args:
            link_id: the link's unique identifier
            ref: an optional reference that identifies this request

        """

    @abstractmethod
    def clear(self, ref: str | None = None):
        """
        Remove all nodes and links from the topology

        Args:
            ref: an optional reference that identifies this request

        """

    @abstractmethod
    def start(self, ref: str | None = None):
        """
        Start execution of the topology

        Args:
            ref: an optional reference that identifies this request
        """

    @abstractmethod
    def is_started(self):
        """
        Returns: True if the topology is executing
        """

    @abstractmethod
    def pause(self, ref: str | None = None):
        """
        Pause execution of the topology.  Until resume is called, no new nodes will start running.

        Args:
            ref: an optional reference that identifies this request
        """

    @abstractmethod
    def resume(self, ref: str | None = None):
        """
        Resume execution of the topology

        Args:
            ref: an optional reference that identifies this request
        """

    @abstractmethod
    def is_paused(self):
        """
        Returns: True if the topology is paused
        """

    @abstractmethod
    def restart(self, ref: str | None = None):
        """

        Restart execution of the topology

        Args:
            ref: an optional reference that identifies this request

        """

    @abstractmethod
    def run(self) -> dict[str, str]:
        """

        Run the topology until all nodes have completed or failed

        Returns: a dictionary containing the error messages returned from any failed nodes
        """

    @abstractmethod
    def run_task(self, task_name: str, input_port_values:dict[str,bytes|list[bytes]], output_ports:list[str], ref: str | None = None) -> tuple[
        dict[str, bytes], dict[str, str]]:
        """
        Run a topology task

        Args:
            task_name: a descriptive name of the task
            input_port_values: map from port identifier (node_id:input_port_name) to a binary-encoded input value or list of values
            output_ports: a list of port identifiers (node_id:output_port_name) specifying output ports to return values from
            ref: an optional reference that identifies this request

        Returns:
            return a tuple of (dict mapping from output port name to value, dict mapping from node id to error string)
        """

    @abstractmethod
    def reload_node(self, node_id: str, properties: JsonType, data: dict[str, bytes], ref: str | None = None):
        """
        Reload a node with new properties and data, triggering re-execution of the node and all downstream nodes
        if the topology execution has already started

        Reloading creates a new instance of the node with the new properties and data

        Args:
            node_id: the id of the node to reload
            properties: the properties to reload
            data: the data to reload
            ref: an optional reference that identifies this request
        """

    ####################################################################################################################
    # retrieve node properties and data

    @abstractmethod
    def get_node_properties(self, node_id: str) -> dict[str, JsonType]:
        """
        Gets the properties of a node

        Args:
            node_id: the node's identifier

        Returns:
            dictionary containing properties
        """

    @abstractmethod
    def get_node_data(self, node_id: str, key: str) -> bytes | None:
        """
        Get binary data associated with this node.

        Args:
            node_id: node identifier
            key: a key to locate the data (can only contain alphanumeric characters and underscores)

        Returns:
            data or None if no data is associated with the key
        """

    @abstractmethod
    def get_node_data_keys(self, node_id: str) -> list[str]:
        """
        Get the list of keys for which the node stores binary data

        Args:
            node_id: node identifier

        Returns:
            a set of data keys
        """

    ####################################################################################################################
    # interact with the topology

    @abstractmethod
    def add_output_listener(self, node_id: str, output_port_name: str, listener: Callable[[bytes], None]):
        """
        Listen for values output from a node in the topology.  Replaces any existing listener on the node/port if present.

        Args:
            node_id: the node id
            output_port_name: the name of the node's output port
            listener: a callback function which is invoked with the value on the output port when the node is run
        """

    @abstractmethod
    def remove_output_listener(self, node_id: str, output_port_name: str):
        """
        Remove a listener from a node/port

        Args:
            node_id: the node id
            output_port_name: the name of the node's output port
        """

    @abstractmethod
    def inject_input_value(self, node_id: str, input_port_name: str, value: bytes | list[bytes]):
        """
        Inject input values into a node in the topology, via an input port.  The port must not be connected.

        Args:
            node_id: the node id
            input_port_name: the name of the node's input port
            value: the value to inject - encoded as bytes.  For ports that accept multiple connections, a list of bytes may be provided.
        """

    ####################################################################################################################
    # session and client related

    @abstractmethod
    def open_session(self, session_id: str | None = None) -> str:
        """
        Open a new interactive session

        Args:
            session_id: the identifier of the session or None to generate a new session identifier

        Returns:
            the session identifier for the opened session
        """

    @abstractmethod
    def close_session(self, session_id: str):
        """
        Close a session

        Args:
            session_id: the identifier of the session to close
        """

    @abstractmethod
    def attach_node_client(self, node_id: str, session_id: str = "", client_id: str = "",
                           client_options: dict = {}) -> ClientApi:
        """
        Attach a client instance to a node.  Any client already attached to the node with the same client_id
        will be detached.

        Args:
            node_id: the node to which the client is to be attached
            session_id: the id of an opened interactive session
            client_id: the name of the client to attach, as defined in the node's schema
            client_options: optional, a dictionary with extra parameters from the client

        Returns:
             an instance which implements the Client API and provides methods to interact with the node

        """

    @abstractmethod
    def attach_configuration_client(self, package_id: str, session_id: str = "", client_id: str = "",
                                    client_options: dict = {}) -> ClientApi:
        """
        Attach a client instance to a package configuration

        Args:
            package_id: the package configuration to which the client is to be attached
            session_id: the id of an opened interactive session
            client_id: the id of the client to attach
            client_options: optional, a dictionary with extra parameters for the client

        Returns:
             an object which implements the Client API and provides methods to interact with the configuration
        """

    ####################################################################################################################
    # load and save

    @abstractmethod
    def load(self, from_file: io.BytesIO, include_data: bool = True, ref: str | None = None) -> Tuple[
        list[str], list[str], dict[str, str]]:
        """
        Merge the nodes and links topology from an opened ZIP file into this topology.  Node and link ids will be renamed to avoid clashes.

        Args:
            from_file: a file opened in binary mode
            include_data: whether or not to include the data from the file

        Returns:
            a tuple containing the set of added node ids, the set of added link ids, and
            a dictionary containing any node renamings performed to avoid id collisions with existing nodes
            ref: an optional reference that identifies this request

        Notes:
            configuration properties/data will not be loaded if this method is called after the topology is started
            topology metadata will not be loaded if topology metadata already exists
        """

    @abstractmethod
    def save(self, to_file: io.BufferedWriter = None, include_data: bool = True):
        """
        save the topology to a zip file

        Args:
            to_file: a file opened in binary mode for writing
            include_data: whether or not to include the data in the file
        """

    @abstractmethod
    def serialise(self):
        """
        Serialise the topology structure to JSON

        Returns: JSON-serialisable dictionary

        """

    @abstractmethod
    def import_from(self, from_path: str, include_data: bool = True, ref: str | None = None) -> Tuple[
        list[str], list[str], dict[str, str]]:
        """
        Merge the nodes and links topology from a YAML file or ZIP file into this topology.  Node and link ids will be renamed to avoid clashes.

        Args:
            from_path: the path to a YAML or ZIP file describing the topology
            include_data: whether or not to include any data referenced in the YAML file
            ref: an optional reference that identifies this request

        Returns:
            a tuple containing the set of added node ids, the set of added link ids, and
            a dictionary containing any node renamings performed to avoid id collisions with existing nodes

        Notes:
            configuration properties/data will not be loaded if this method is called after the topology is started
            topology metadata will not be loaded if topology metadata already exists
        """

    @abstractmethod
    def export_to(self, to_path: str, include_data: bool = True):
        """
        save the topology to a zip file or yaml file

        Args:
            to_path: the path to the YAML file or ZIP file to export to
            include_data: whether or not to export data as well as properties
        """

    ####################################################################################################################
    # topology introspection

    @abstractmethod
    def get_nodes(self) -> dict[str, NodeTypeApi]:
        """
        Get details of the node ids and types in the topology

        Returns:
            dict containing a mapping from node id to node type
        """

    @abstractmethod
    def get_links(self) -> dict[str, tuple[LinkTypeApi, str, str]]:
        """
        Get details of the link ids and link types in the topology

        Returns:
            dict containing a mapping from link id to a tuple of format (link_type,<node_id>:<from_port_name>,<node_id>:<to_port_name>)>
        """

    @abstractmethod
    def get_link_ids_for_node(self, node_id: str) -> list[str]:
        """
        Get the ids of all links in the topology that are connected to a node

        Args:
            node_id: the id of the node

        Returns:
            list of link ids connected to the node
        """

    ####################################################################################################################
    # schema introspection

    def get_schema(self) -> SchemaTypeApi:
        """

        Returns: an instance implementing the SchemaApi allowing the packahes, node types and link types in the schema to be examined

        """

    ####################################################################################################################
    # Attach and Detach Listeners

    @abstractmethod
    def attach_listener(self, listener: TopologyListenerAPI) -> None:
        """
        Attach a listener instance which implements the TopologyListenerAPI

        Args:
            listener: the listener instance
        """

    @abstractmethod
    def detach_listener(self, listener: TopologyListenerAPI) -> None:
        """
        Detach a listener

        Args:
            listener: the listener instance
        """

Add a link to the topology

Parameters:

Name Type Description Default
link_id str

a requested unique identifier for the link

required
from_node_id str

node id of the source node

required
from_port str | None

port name on the source node, can be omitted if the "from" node has only one output port

required
to_node_id str

node id of the destination node

required
to_port str | None

port name on the destination node, can be omitted if the "to" node has only one input port

required
ref str | None

an optional reference that identifies this request

None

Raises:

Type Description
InvalidLinkError

if the link cannot be added

Returns:

Type Description
str

link_id of the added link

Source code in hyrrokkin/interfaces/topology_api.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@abstractmethod
def add_link(self, link_id: str, from_node_id: str, from_port: str | None, to_node_id: str,
             to_port: str | None, ref: str | None = None) -> str:
    """
    Add a link to the topology

    Args:
        link_id: a requested unique identifier for the link
        from_node_id: node id of the source node
        from_port: port name on the source node, can be omitted if the "from" node has only one output port
        to_node_id: node id of the destination node
        to_port: port name on the destination node, can be omitted if the "to" node has only one input port
        ref: an optional reference that identifies this request

    Raises:
        InvalidLinkError: if the link cannot be added

    Returns:
        link_id of the added link
    """

add_node(node_id, node_type, metadata={}, properties={}, data={}, copy_from_node_id='', ref=None) abstractmethod

Add a node to the topology

Parameters:

Name Type Description Default
node_id str | None

the node's requested unique identifier or None

required
node_type str

the type of the node, a string of the form package_id:node_type_id

required
metadata dict[str, JsonType]

a dictionary containing the new metadata

{}
properties dict[str, JsonType]

dictionary containing the node's property names and values, must be JSON serialisable

{}
data dict[str, bytes]

if set, initialise with the properties and data copied from this node

{}
copy_from_node_id str

if set, initialise with the properties and data copied from this node rather than supplied in the data and properties arguments

''
ref str | None

an optional reference that identifies this request

None

Returns:

Type Description
str

the id of the added node

Source code in hyrrokkin/interfaces/topology_api.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@abstractmethod
def add_node(self, node_id: str | None, node_type: str, metadata: dict[str, JsonType] = {},
             properties: dict[str, JsonType] = {},
             data: dict[str, bytes] = {}, copy_from_node_id: str = "", ref: str | None = None) -> str:
    """
    Add a node to the topology

    Args:
        node_id: the node's requested unique identifier or None
        node_type: the type of the node, a string of the form package_id:node_type_id
        metadata: a dictionary containing the new metadata
        properties: dictionary containing the node's property names and values, must be JSON serialisable
        data: if set, initialise with the properties and data copied from this node
        copy_from_node_id: if set, initialise with the properties and data copied from this node rather than supplied in the data and properties arguments
        ref: an optional reference that identifies this request

    Returns:
        the id of the added node
    """

add_output_listener(node_id, output_port_name, listener) abstractmethod

Listen for values output from a node in the topology. Replaces any existing listener on the node/port if present.

Parameters:

Name Type Description Default
node_id str

the node id

required
output_port_name str

the name of the node's output port

required
listener Callable[[bytes], None]

a callback function which is invoked with the value on the output port when the node is run

required
Source code in hyrrokkin/interfaces/topology_api.py
261
262
263
264
265
266
267
268
269
270
@abstractmethod
def add_output_listener(self, node_id: str, output_port_name: str, listener: Callable[[bytes], None]):
    """
    Listen for values output from a node in the topology.  Replaces any existing listener on the node/port if present.

    Args:
        node_id: the node id
        output_port_name: the name of the node's output port
        listener: a callback function which is invoked with the value on the output port when the node is run
    """

attach_configuration_client(package_id, session_id='', client_id='', client_options={}) abstractmethod

Attach a client instance to a package configuration

Parameters:

Name Type Description Default
package_id str

the package configuration to which the client is to be attached

required
session_id str

the id of an opened interactive session

''
client_id str

the id of the client to attach

''
client_options dict

optional, a dictionary with extra parameters for the client

{}

Returns:

Type Description
ClientApi

an object which implements the Client API and provides methods to interact with the configuration

Source code in hyrrokkin/interfaces/topology_api.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
@abstractmethod
def attach_configuration_client(self, package_id: str, session_id: str = "", client_id: str = "",
                                client_options: dict = {}) -> ClientApi:
    """
    Attach a client instance to a package configuration

    Args:
        package_id: the package configuration to which the client is to be attached
        session_id: the id of an opened interactive session
        client_id: the id of the client to attach
        client_options: optional, a dictionary with extra parameters for the client

    Returns:
         an object which implements the Client API and provides methods to interact with the configuration
    """

attach_listener(listener) abstractmethod

Attach a listener instance which implements the TopologyListenerAPI

Parameters:

Name Type Description Default
listener TopologyListenerAPI

the listener instance

required
Source code in hyrrokkin/interfaces/topology_api.py
469
470
471
472
473
474
475
476
@abstractmethod
def attach_listener(self, listener: TopologyListenerAPI) -> None:
    """
    Attach a listener instance which implements the TopologyListenerAPI

    Args:
        listener: the listener instance
    """

attach_node_client(node_id, session_id='', client_id='', client_options={}) abstractmethod

Attach a client instance to a node. Any client already attached to the node with the same client_id will be detached.

Parameters:

Name Type Description Default
node_id str

the node to which the client is to be attached

required
session_id str

the id of an opened interactive session

''
client_id str

the name of the client to attach, as defined in the node's schema

''
client_options dict

optional, a dictionary with extra parameters from the client

{}

Returns:

Type Description
ClientApi

an instance which implements the Client API and provides methods to interact with the node

Source code in hyrrokkin/interfaces/topology_api.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
@abstractmethod
def attach_node_client(self, node_id: str, session_id: str = "", client_id: str = "",
                       client_options: dict = {}) -> ClientApi:
    """
    Attach a client instance to a node.  Any client already attached to the node with the same client_id
    will be detached.

    Args:
        node_id: the node to which the client is to be attached
        session_id: the id of an opened interactive session
        client_id: the name of the client to attach, as defined in the node's schema
        client_options: optional, a dictionary with extra parameters from the client

    Returns:
         an instance which implements the Client API and provides methods to interact with the node

    """

clear(ref=None) abstractmethod

Remove all nodes and links from the topology

Parameters:

Name Type Description Default
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
118
119
120
121
122
123
124
125
126
@abstractmethod
def clear(self, ref: str | None = None):
    """
    Remove all nodes and links from the topology

    Args:
        ref: an optional reference that identifies this request

    """

close_session(session_id) abstractmethod

Close a session

Parameters:

Name Type Description Default
session_id str

the identifier of the session to close

required
Source code in hyrrokkin/interfaces/topology_api.py
308
309
310
311
312
313
314
315
@abstractmethod
def close_session(self, session_id: str):
    """
    Close a session

    Args:
        session_id: the identifier of the session to close
    """

detach_listener(listener) abstractmethod

Detach a listener

Parameters:

Name Type Description Default
listener TopologyListenerAPI

the listener instance

required
Source code in hyrrokkin/interfaces/topology_api.py
478
479
480
481
482
483
484
485
@abstractmethod
def detach_listener(self, listener: TopologyListenerAPI) -> None:
    """
    Detach a listener

    Args:
        listener: the listener instance
    """

export_to(to_path, include_data=True) abstractmethod

save the topology to a zip file or yaml file

Parameters:

Name Type Description Default
to_path str

the path to the YAML file or ZIP file to export to

required
include_data bool

whether or not to export data as well as properties

True
Source code in hyrrokkin/interfaces/topology_api.py
413
414
415
416
417
418
419
420
421
@abstractmethod
def export_to(self, to_path: str, include_data: bool = True):
    """
    save the topology to a zip file or yaml file

    Args:
        to_path: the path to the YAML file or ZIP file to export to
        include_data: whether or not to export data as well as properties
    """

Get the ids of all links in the topology that are connected to a node

Parameters:

Name Type Description Default
node_id str

the id of the node

required

Returns:

Type Description
list[str]

list of link ids connected to the node

Source code in hyrrokkin/interfaces/topology_api.py
444
445
446
447
448
449
450
451
452
453
454
@abstractmethod
def get_link_ids_for_node(self, node_id: str) -> list[str]:
    """
    Get the ids of all links in the topology that are connected to a node

    Args:
        node_id: the id of the node

    Returns:
        list of link ids connected to the node
    """

Get details of the link ids and link types in the topology

Returns:

Type Description
dict[str, tuple[LinkTypeApi, str, str]]

dict containing a mapping from link id to a tuple of format (link_type,:,:)>

Source code in hyrrokkin/interfaces/topology_api.py
435
436
437
438
439
440
441
442
@abstractmethod
def get_links(self) -> dict[str, tuple[LinkTypeApi, str, str]]:
    """
    Get details of the link ids and link types in the topology

    Returns:
        dict containing a mapping from link id to a tuple of format (link_type,<node_id>:<from_port_name>,<node_id>:<to_port_name>)>
    """

get_node_data(node_id, key) abstractmethod

Get binary data associated with this node.

Parameters:

Name Type Description Default
node_id str

node identifier

required
key str

a key to locate the data (can only contain alphanumeric characters and underscores)

required

Returns:

Type Description
bytes | None

data or None if no data is associated with the key

Source code in hyrrokkin/interfaces/topology_api.py
233
234
235
236
237
238
239
240
241
242
243
244
@abstractmethod
def get_node_data(self, node_id: str, key: str) -> bytes | None:
    """
    Get binary data associated with this node.

    Args:
        node_id: node identifier
        key: a key to locate the data (can only contain alphanumeric characters and underscores)

    Returns:
        data or None if no data is associated with the key
    """

get_node_data_keys(node_id) abstractmethod

Get the list of keys for which the node stores binary data

Parameters:

Name Type Description Default
node_id str

node identifier

required

Returns:

Type Description
list[str]

a set of data keys

Source code in hyrrokkin/interfaces/topology_api.py
246
247
248
249
250
251
252
253
254
255
256
@abstractmethod
def get_node_data_keys(self, node_id: str) -> list[str]:
    """
    Get the list of keys for which the node stores binary data

    Args:
        node_id: node identifier

    Returns:
        a set of data keys
    """

get_node_properties(node_id) abstractmethod

Gets the properties of a node

Parameters:

Name Type Description Default
node_id str

the node's identifier

required

Returns:

Type Description
dict[str, JsonType]

dictionary containing properties

Source code in hyrrokkin/interfaces/topology_api.py
221
222
223
224
225
226
227
228
229
230
231
@abstractmethod
def get_node_properties(self, node_id: str) -> dict[str, JsonType]:
    """
    Gets the properties of a node

    Args:
        node_id: the node's identifier

    Returns:
        dictionary containing properties
    """

get_nodes() abstractmethod

Get details of the node ids and types in the topology

Returns:

Type Description
dict[str, NodeTypeApi]

dict containing a mapping from node id to node type

Source code in hyrrokkin/interfaces/topology_api.py
426
427
428
429
430
431
432
433
@abstractmethod
def get_nodes(self) -> dict[str, NodeTypeApi]:
    """
    Get details of the node ids and types in the topology

    Returns:
        dict containing a mapping from node id to node type
    """

get_schema()

Returns: an instance implementing the SchemaApi allowing the packahes, node types and link types in the schema to be examined

Source code in hyrrokkin/interfaces/topology_api.py
459
460
461
462
463
464
def get_schema(self) -> SchemaTypeApi:
    """

    Returns: an instance implementing the SchemaApi allowing the packahes, node types and link types in the schema to be examined

    """

import_from(from_path, include_data=True, ref=None) abstractmethod

Merge the nodes and links topology from a YAML file or ZIP file into this topology. Node and link ids will be renamed to avoid clashes.

Parameters:

Name Type Description Default
from_path str

the path to a YAML or ZIP file describing the topology

required
include_data bool

whether or not to include any data referenced in the YAML file

True
ref str | None

an optional reference that identifies this request

None

Returns:

Type Description
list[str]

a tuple containing the set of added node ids, the set of added link ids, and

list[str]

a dictionary containing any node renamings performed to avoid id collisions with existing nodes

Notes

configuration properties/data will not be loaded if this method is called after the topology is started topology metadata will not be loaded if topology metadata already exists

Source code in hyrrokkin/interfaces/topology_api.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
@abstractmethod
def import_from(self, from_path: str, include_data: bool = True, ref: str | None = None) -> Tuple[
    list[str], list[str], dict[str, str]]:
    """
    Merge the nodes and links topology from a YAML file or ZIP file into this topology.  Node and link ids will be renamed to avoid clashes.

    Args:
        from_path: the path to a YAML or ZIP file describing the topology
        include_data: whether or not to include any data referenced in the YAML file
        ref: an optional reference that identifies this request

    Returns:
        a tuple containing the set of added node ids, the set of added link ids, and
        a dictionary containing any node renamings performed to avoid id collisions with existing nodes

    Notes:
        configuration properties/data will not be loaded if this method is called after the topology is started
        topology metadata will not be loaded if topology metadata already exists
    """

inject_input_value(node_id, input_port_name, value) abstractmethod

Inject input values into a node in the topology, via an input port. The port must not be connected.

Parameters:

Name Type Description Default
node_id str

the node id

required
input_port_name str

the name of the node's input port

required
value bytes | list[bytes]

the value to inject - encoded as bytes. For ports that accept multiple connections, a list of bytes may be provided.

required
Source code in hyrrokkin/interfaces/topology_api.py
282
283
284
285
286
287
288
289
290
291
@abstractmethod
def inject_input_value(self, node_id: str, input_port_name: str, value: bytes | list[bytes]):
    """
    Inject input values into a node in the topology, via an input port.  The port must not be connected.

    Args:
        node_id: the node id
        input_port_name: the name of the node's input port
        value: the value to inject - encoded as bytes.  For ports that accept multiple connections, a list of bytes may be provided.
    """

is_paused() abstractmethod

Returns: True if the topology is paused

Source code in hyrrokkin/interfaces/topology_api.py
161
162
163
164
165
@abstractmethod
def is_paused(self):
    """
    Returns: True if the topology is paused
    """

is_started() abstractmethod

Returns: True if the topology is executing

Source code in hyrrokkin/interfaces/topology_api.py
137
138
139
140
141
@abstractmethod
def is_started(self):
    """
    Returns: True if the topology is executing
    """

load(from_file, include_data=True, ref=None) abstractmethod

Merge the nodes and links topology from an opened ZIP file into this topology. Node and link ids will be renamed to avoid clashes.

Parameters:

Name Type Description Default
from_file BytesIO

a file opened in binary mode

required
include_data bool

whether or not to include the data from the file

True

Returns:

Name Type Description
list[str]

a tuple containing the set of added node ids, the set of added link ids, and

list[str]

a dictionary containing any node renamings performed to avoid id collisions with existing nodes

ref dict[str, str]

an optional reference that identifies this request

Notes

configuration properties/data will not be loaded if this method is called after the topology is started topology metadata will not be loaded if topology metadata already exists

Source code in hyrrokkin/interfaces/topology_api.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
@abstractmethod
def load(self, from_file: io.BytesIO, include_data: bool = True, ref: str | None = None) -> Tuple[
    list[str], list[str], dict[str, str]]:
    """
    Merge the nodes and links topology from an opened ZIP file into this topology.  Node and link ids will be renamed to avoid clashes.

    Args:
        from_file: a file opened in binary mode
        include_data: whether or not to include the data from the file

    Returns:
        a tuple containing the set of added node ids, the set of added link ids, and
        a dictionary containing any node renamings performed to avoid id collisions with existing nodes
        ref: an optional reference that identifies this request

    Notes:
        configuration properties/data will not be loaded if this method is called after the topology is started
        topology metadata will not be loaded if topology metadata already exists
    """

open_session(session_id=None) abstractmethod

Open a new interactive session

Parameters:

Name Type Description Default
session_id str | None

the identifier of the session or None to generate a new session identifier

None

Returns:

Type Description
str

the session identifier for the opened session

Source code in hyrrokkin/interfaces/topology_api.py
296
297
298
299
300
301
302
303
304
305
306
@abstractmethod
def open_session(self, session_id: str | None = None) -> str:
    """
    Open a new interactive session

    Args:
        session_id: the identifier of the session or None to generate a new session identifier

    Returns:
        the session identifier for the opened session
    """

pause(ref=None) abstractmethod

Pause execution of the topology. Until resume is called, no new nodes will start running.

Parameters:

Name Type Description Default
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
143
144
145
146
147
148
149
150
@abstractmethod
def pause(self, ref: str | None = None):
    """
    Pause execution of the topology.  Until resume is called, no new nodes will start running.

    Args:
        ref: an optional reference that identifies this request
    """

reload_node(node_id, properties, data, ref=None) abstractmethod

Reload a node with new properties and data, triggering re-execution of the node and all downstream nodes if the topology execution has already started

Reloading creates a new instance of the node with the new properties and data

Parameters:

Name Type Description Default
node_id str

the id of the node to reload

required
properties JsonType

the properties to reload

required
data dict[str, bytes]

the data to reload

required
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
@abstractmethod
def reload_node(self, node_id: str, properties: JsonType, data: dict[str, bytes], ref: str | None = None):
    """
    Reload a node with new properties and data, triggering re-execution of the node and all downstream nodes
    if the topology execution has already started

    Reloading creates a new instance of the node with the new properties and data

    Args:
        node_id: the id of the node to reload
        properties: the properties to reload
        data: the data to reload
        ref: an optional reference that identifies this request
    """

Remove a link from the topology

Parameters:

Name Type Description Default
link_id str

the link's unique identifier

required
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
108
109
110
111
112
113
114
115
116
def remove_link(self, link_id: str, ref: str | None = None):
    """
    Remove a link from the topology

    Args:
        link_id: the link's unique identifier
        ref: an optional reference that identifies this request

    """

remove_node(node_id, ref=None) abstractmethod

Remove a node from the topology

Parameters:

Name Type Description Default
node_id str

the node's unique identifier

required
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
77
78
79
80
81
82
83
84
85
@abstractmethod
def remove_node(self, node_id: str, ref: str | None = None):
    """
    Remove a node from the topology

    Args:
        node_id: the node's unique identifier
        ref: an optional reference that identifies this request
    """

remove_output_listener(node_id, output_port_name) abstractmethod

Remove a listener from a node/port

Parameters:

Name Type Description Default
node_id str

the node id

required
output_port_name str

the name of the node's output port

required
Source code in hyrrokkin/interfaces/topology_api.py
272
273
274
275
276
277
278
279
280
@abstractmethod
def remove_output_listener(self, node_id: str, output_port_name: str):
    """
    Remove a listener from a node/port

    Args:
        node_id: the node id
        output_port_name: the name of the node's output port
    """

restart(ref=None) abstractmethod

Restart execution of the topology

Parameters:

Name Type Description Default
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
167
168
169
170
171
172
173
174
175
176
@abstractmethod
def restart(self, ref: str | None = None):
    """

    Restart execution of the topology

    Args:
        ref: an optional reference that identifies this request

    """

resume(ref=None) abstractmethod

Resume execution of the topology

Parameters:

Name Type Description Default
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
152
153
154
155
156
157
158
159
@abstractmethod
def resume(self, ref: str | None = None):
    """
    Resume execution of the topology

    Args:
        ref: an optional reference that identifies this request
    """

run() abstractmethod

Run the topology until all nodes have completed or failed

Returns: a dictionary containing the error messages returned from any failed nodes

Source code in hyrrokkin/interfaces/topology_api.py
178
179
180
181
182
183
184
185
@abstractmethod
def run(self) -> dict[str, str]:
    """

    Run the topology until all nodes have completed or failed

    Returns: a dictionary containing the error messages returned from any failed nodes
    """

run_task(task_name, input_port_values, output_ports, ref=None) abstractmethod

Run a topology task

Parameters:

Name Type Description Default
task_name str

a descriptive name of the task

required
input_port_values dict[str, bytes | list[bytes]]

map from port identifier (node_id:input_port_name) to a binary-encoded input value or list of values

required
output_ports list[str]

a list of port identifiers (node_id:output_port_name) specifying output ports to return values from

required
ref str | None

an optional reference that identifies this request

None

Returns:

Type Description
tuple[dict[str, bytes], dict[str, str]]

return a tuple of (dict mapping from output port name to value, dict mapping from node id to error string)

Source code in hyrrokkin/interfaces/topology_api.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
@abstractmethod
def run_task(self, task_name: str, input_port_values:dict[str,bytes|list[bytes]], output_ports:list[str], ref: str | None = None) -> tuple[
    dict[str, bytes], dict[str, str]]:
    """
    Run a topology task

    Args:
        task_name: a descriptive name of the task
        input_port_values: map from port identifier (node_id:input_port_name) to a binary-encoded input value or list of values
        output_ports: a list of port identifiers (node_id:output_port_name) specifying output ports to return values from
        ref: an optional reference that identifies this request

    Returns:
        return a tuple of (dict mapping from output port name to value, dict mapping from node id to error string)
    """

save(to_file=None, include_data=True) abstractmethod

save the topology to a zip file

Parameters:

Name Type Description Default
to_file BufferedWriter

a file opened in binary mode for writing

None
include_data bool

whether or not to include the data in the file

True
Source code in hyrrokkin/interfaces/topology_api.py
374
375
376
377
378
379
380
381
382
@abstractmethod
def save(self, to_file: io.BufferedWriter = None, include_data: bool = True):
    """
    save the topology to a zip file

    Args:
        to_file: a file opened in binary mode for writing
        include_data: whether or not to include the data in the file
    """

serialise() abstractmethod

Serialise the topology structure to JSON

Returns: JSON-serialisable dictionary

Source code in hyrrokkin/interfaces/topology_api.py
384
385
386
387
388
389
390
391
@abstractmethod
def serialise(self):
    """
    Serialise the topology structure to JSON

    Returns: JSON-serialisable dictionary

    """

set_metadata(metadata, ref=None) abstractmethod

Set metadata for this topology

Parameters:

Name Type Description Default
metadata dict[str, str]

a dictionary containing metadata, consisting of string keys and values.

required
ref str | None

an optional reference that identifies this request

None
Notes

the following keys will be understood by hyrrokkin based tools - version, description, authors

Source code in hyrrokkin/interfaces/topology_api.py
32
33
34
35
36
37
38
39
40
41
42
43
@abstractmethod
def set_metadata(self, metadata: dict[str, str], ref: str | None = None):
    """
    Set metadata for this topology

    Args:
        metadata: a dictionary containing metadata, consisting of string keys and values.
        ref: an optional reference that identifies this request

    Notes:
        the following keys will be understood by hyrrokkin based tools - version, description, authors
    """

start(ref=None) abstractmethod

Start execution of the topology

Parameters:

Name Type Description Default
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
128
129
130
131
132
133
134
135
@abstractmethod
def start(self, ref: str | None = None):
    """
    Start execution of the topology

    Args:
        ref: an optional reference that identifies this request
    """

update_node_metadata(node_id, metadata={}, ref=None) abstractmethod

Update a node's metadata

Parameters:

Name Type Description Default
node_id str | None

the node's requested unique identifier or None

required
metadata dict[str, JsonType]

a dictionary containing the new metadata

{}
ref str | None

an optional reference that identifies this request

None
Source code in hyrrokkin/interfaces/topology_api.py
65
66
67
68
69
70
71
72
73
74
75
@abstractmethod
def update_node_metadata(self, node_id: str | None, metadata: dict[str, JsonType] = {},
                         ref: str | None = None) -> None:
    """
    Update a node's metadata

    Args:
        node_id: the node's requested unique identifier or None
        metadata: a dictionary containing the new metadata
        ref: an optional reference that identifies this request
    """