about summary refs log tree commit diff
path: root/nixos/lib
diff options
context:
space:
mode:
authorJacek Galowicz <jacek@galowicz.de>2023-02-07 09:17:06 +0100
committerGitHub <noreply@github.com>2023-02-07 09:17:06 +0100
commit7f88d9c34c426fd82faf4fd87d15cd88d18fc43c (patch)
tree361819b063e5eb6c4edc0f04a6ea067c5d232e0f /nixos/lib
parent620aa4ff15746980f5106a194b72c9732f45ac5e (diff)
parente375feffbefce4961fc6b97aef83e60f9ed0e705 (diff)
Merge pull request #214910 from rnhmjoj/pr-gnupg-test
nixos/tests/gnupg: init
Diffstat (limited to 'nixos/lib')
-rw-r--r--nixos/lib/test-driver/test_driver/machine.py50
1 files changed, 26 insertions, 24 deletions
diff --git a/nixos/lib/test-driver/test_driver/machine.py b/nixos/lib/test-driver/test_driver/machine.py
index 8f01833bffb42..0db7930f496b2 100644
--- a/nixos/lib/test-driver/test_driver/machine.py
+++ b/nixos/lib/test-driver/test_driver/machine.py
@@ -1,4 +1,4 @@
-from contextlib import _GeneratorContextManager
+from contextlib import _GeneratorContextManager, nullcontext
 from pathlib import Path
 from queue import Queue
 from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
@@ -406,25 +406,23 @@ class Machine:
         return rootlog.nested(msg, my_attrs)
 
     def wait_for_monitor_prompt(self) -> str:
-        with self.nested("waiting for monitor prompt"):
-            assert self.monitor is not None
-            answer = ""
-            while True:
-                undecoded_answer = self.monitor.recv(1024)
-                if not undecoded_answer:
-                    break
-                answer += undecoded_answer.decode()
-                if answer.endswith("(qemu) "):
-                    break
-            return answer
+        assert self.monitor is not None
+        answer = ""
+        while True:
+            undecoded_answer = self.monitor.recv(1024)
+            if not undecoded_answer:
+                break
+            answer += undecoded_answer.decode()
+            if answer.endswith("(qemu) "):
+                break
+        return answer
 
     def send_monitor_command(self, command: str) -> str:
         self.run_callbacks()
-        with self.nested(f"sending monitor command: {command}"):
-            message = f"{command}\n".encode()
-            assert self.monitor is not None
-            self.monitor.send(message)
-            return self.wait_for_monitor_prompt()
+        message = f"{command}\n".encode()
+        assert self.monitor is not None
+        self.monitor.send(message)
+        return self.wait_for_monitor_prompt()
 
     def wait_for_unit(
         self, unit: str, user: Optional[str] = None, timeout: int = 900
@@ -547,7 +545,7 @@ class Machine:
         self.shell.send("echo ${PIPESTATUS[0]}\n".encode())
         rc = int(self._next_newline_closed_block_from_shell().strip())
 
-        return (rc, output.decode())
+        return (rc, output.decode(errors="replace"))
 
     def shell_interact(self, address: Optional[str] = None) -> None:
         """Allows you to interact with the guest shell for debugging purposes.
@@ -685,9 +683,9 @@ class Machine:
             retry(tty_matches)
 
     def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None:
-        with self.nested(f"sending keys '{chars}'"):
+        with self.nested(f"sending keys {repr(chars)}"):
             for char in chars:
-                self.send_key(char, delay)
+                self.send_key(char, delay, log=False)
 
     def wait_for_file(self, filename: str) -> None:
         """Waits until the file exists in machine's file system."""
@@ -860,11 +858,15 @@ class Machine:
                 if matches is not None:
                     return
 
-    def send_key(self, key: str, delay: Optional[float] = 0.01) -> None:
+    def send_key(
+        self, key: str, delay: Optional[float] = 0.01, log: Optional[bool] = True
+    ) -> None:
         key = CHAR_TO_KEY.get(key, key)
-        self.send_monitor_command(f"sendkey {key}")
-        if delay is not None:
-            time.sleep(delay)
+        context = self.nested(f"sending key {repr(key)}") if log else nullcontext()
+        with context:
+            self.send_monitor_command(f"sendkey {key}")
+            if delay is not None:
+                time.sleep(delay)
 
     def send_console(self, chars: str) -> None:
         assert self.process