about summary refs log tree commit diff
path: root/nixos/lib
diff options
context:
space:
mode:
authorRaito Bezarius <masterancpp@gmail.com>2023-10-22 17:54:51 +0200
committerRaito Bezarius <masterancpp@gmail.com>2024-01-25 20:50:20 +0100
commit1196ae6e6bdc3070402fd0875f8f5b69cb4c4a12 (patch)
tree934804696ee2d95cd234c8d60a07c8f5263fbd1c /nixos/lib
parent5545bc83eca8eb5dea16485e1e2d39abbd9f136a (diff)
nixos/lib/test-driver: add `wait_for_qmp_event`
Adds a function to wait for a new QMP event with a model filter
so that you can expect specific type of events with specific payloads.

e.g. a guest-reset-induced shutdown event.
Diffstat (limited to 'nixos/lib')
-rw-r--r--nixos/lib/test-driver/test_driver/machine.py24
1 files changed, 24 insertions, 0 deletions
diff --git a/nixos/lib/test-driver/test_driver/machine.py b/nixos/lib/test-driver/test_driver/machine.py
index 529de41d892a9..cb6bfa6099e97 100644
--- a/nixos/lib/test-driver/test_driver/machine.py
+++ b/nixos/lib/test-driver/test_driver/machine.py
@@ -740,6 +740,30 @@ class Machine:
             self.booted = False
             self.connected = False
 
+    def wait_for_qmp_event(self, event_filter: Callable[[dict[str, Any]], bool], timeout: int = 60 * 10) -> dict[str, Any]:
+        """
+        Wait for a QMP event which you can filter with the `event_filter` function.
+        The function takes as an input a dictionary of the event and if it returns True, we return that event,
+        if it does not, we wait for the next event and retry.
+
+        It will skip all events received in the meantime, if you want to keep them,
+        you have to do the bookkeeping yourself and store them somewhere.
+
+        By default, it will wait up to 10 minutes, `timeout` is in seconds.
+        """
+        if self.qmp_client is None:
+            raise RuntimeError('QMP API is not ready yet, is the VM ready?')
+
+        start = time.time()
+        while True:
+            evt = self.qmp_client.wait_for_event(timeout=timeout)
+            if event_filter(evt):
+                return evt
+
+            elapsed = time.time() - start
+            if elapsed >= timeout:
+                raise TimeoutError
+
     def get_tty_text(self, tty: str) -> str:
         status, output = self.execute(
             f"fold -w$(stty -F /dev/tty{tty} size | "