about summary refs log tree commit diff
path: root/nixos/tests/systemd-confinement
diff options
context:
space:
mode:
authoraszlig <aszlig@nix.build>2024-04-24 19:11:06 +0200
committeraszlig <aszlig@nix.build>2024-05-13 00:40:36 +0200
commit51d3f3475c2fb35f0d9682ea600066b1cea459c6 (patch)
tree2b272ca36ddbb2d935da90d9e5fcc6fc07b3a18d /nixos/tests/systemd-confinement
parentf7d026b4312a3f50c44d97be32b0669e8fad2a76 (diff)
nixos/tests/confinement: Run test probes in Python
So far the architecture for the tests was that we would use a systemd
socket unit using the Accept option to start a small shell process where
we can pipe commands into by connecting to the socket created by the
socket unit.

This is unnecessary since we can directly use the code snippets from the
individual subtests and systemd will take care of checking the return
code in case we get any assertions[^1].

Another advantage of this is that tests now run in parallel, so we can
do rather expensive things such as looking in /nix to see whether
anything is writable.

The new assert_permissions() function is the main driver behind this and
allows for a more fine-grained way to check whether we got the right
permissions whilst also ignoring irrelevant things such as read-only
empty directories.

Our previous approach also just did a read-only check, which might be
fine in full-apivfs mode where the attack surface already is large, but
in chroot-only mode we really want to make sure nothing is every
writable.

A downside of the new approach is that currently the unit names are
numbered via lib.imap1, which makes it annoying to track its definition.

[^1]: Speaking of assertions, I wrapped the code to be run with pytest's
      assertion rewriting, so that we get more useful AssertionErrors.

Signed-off-by: aszlig <aszlig@nix.build>
Diffstat (limited to 'nixos/tests/systemd-confinement')
-rw-r--r--nixos/tests/systemd-confinement/checkperms.py187
-rw-r--r--nixos/tests/systemd-confinement/default.nix433
2 files changed, 398 insertions, 222 deletions
diff --git a/nixos/tests/systemd-confinement/checkperms.py b/nixos/tests/systemd-confinement/checkperms.py
new file mode 100644
index 0000000000000..3c7ba279a3d20
--- /dev/null
+++ b/nixos/tests/systemd-confinement/checkperms.py
@@ -0,0 +1,187 @@
+import errno
+import os
+
+from enum import IntEnum
+from pathlib import Path
+
+
+class Accessibility(IntEnum):
+    """
+    The level of accessibility we have on a file or directory.
+
+    This is needed to assess the attack surface on the file system namespace we
+    have within a confined service. Higher levels mean more permissions for the
+    user and thus a bigger attack surface.
+    """
+    NONE = 0
+
+    # Directories can be listed or files can be read.
+    READABLE = 1
+
+    # This is for special file systems such as procfs and for stuff such as
+    # FIFOs or character special files. The reason why this has a lower value
+    # than WRITABLE is because those files are more restricted on what and how
+    # they can be written to.
+    SPECIAL = 2
+
+    # Another special case are sticky directories, which do allow write access
+    # but restrict deletion. This does *not* apply to sticky directories that
+    # are read-only.
+    STICKY = 3
+
+    # Essentially full permissions, the kind of accessibility we want to avoid
+    # in most cases.
+    WRITABLE = 4
+
+    def assert_on(self, path: Path) -> None:
+        """
+        Raise an AssertionError if the given 'path' allows for more
+        accessibility than 'self'.
+        """
+        actual = self.NONE
+
+        if path.is_symlink():
+            actual = self.READABLE
+        elif path.is_dir():
+            writable = True
+
+            dummy_file = path / 'can_i_write'
+            try:
+                dummy_file.touch()
+            except OSError as e:
+                if e.errno in [errno.EROFS, errno.EACCES]:
+                    writable = False
+                else:
+                    raise
+            else:
+                dummy_file.unlink()
+
+            if writable:
+                # The reason why we test this *after* we made sure it's
+                # writable is because we could have a sticky directory where
+                # the current user doesn't have write access.
+                if path.stat().st_mode & 0o1000 == 0o1000:
+                    actual = self.STICKY
+                else:
+                    actual = self.WRITABLE
+            else:
+                actual = self.READABLE
+        elif path.is_file():
+            try:
+                with path.open('rb') as fp:
+                    fp.read(1)
+                actual = self.READABLE
+            except PermissionError:
+                pass
+
+            writable = True
+            try:
+                with path.open('ab') as fp:
+                    fp.write('x')
+                    size = fp.tell()
+                    fp.truncate(size)
+            except PermissionError:
+                writable = False
+            except OSError as e:
+                if e.errno == errno.ETXTBSY:
+                    writable = os.access(path, os.W_OK)
+                elif e.errno == errno.EROFS:
+                    writable = False
+                else:
+                    raise
+
+            # Let's always try to fail towards being writable, so if *either*
+            # access(2) or a real write is successful it's writable. This is to
+            # make sure we don't accidentally introduce no-ops if we have bugs
+            # in the more complicated real write code above.
+            if writable or os.access(path, os.W_OK):
+                actual = self.WRITABLE
+        else:
+            # We need to be very careful when writing to or reading from
+            # special files (eg.  FIFOs), since they can possibly block. So if
+            # it's not a file, just trust that access(2) won't lie.
+            if os.access(path, os.R_OK):
+                actual = self.READABLE
+
+            if os.access(path, os.W_OK):
+                actual = self.SPECIAL
+
+        if actual > self:
+            stat = path.stat()
+            details = ', '.join([
+                f'permissions: {stat.st_mode & 0o7777:o}',
+                f'uid: {stat.st_uid}',
+                f'group: {stat.st_gid}',
+            ])
+
+            raise AssertionError(
+                f'Expected at most {self!r} but got {actual!r} for path'
+                f' {path} ({details}).'
+            )
+
+
+def is_special_fs(path: Path) -> bool:
+    """
+    Check whether the given path truly is a special file system such as procfs
+    or sysfs.
+    """
+    try:
+        if path == Path('/proc'):
+            return (path / 'version').read_text().startswith('Linux')
+        elif path == Path('/sys'):
+            return b'Linux' in (path / 'kernel' / 'notes').read_bytes()
+    except FileNotFoundError:
+        pass
+    return False
+
+
+def is_empty_dir(path: Path) -> bool:
+    try:
+        next(path.iterdir())
+        return False
+    except (StopIteration, PermissionError):
+        return True
+
+
+def _assert_permissions_in_directory(
+    directory: Path,
+    accessibility: Accessibility,
+    subdirs: dict[Path, Accessibility],
+) -> None:
+    accessibility.assert_on(directory)
+
+    for file in directory.iterdir():
+        if is_special_fs(file):
+            msg = f'Got unexpected special filesystem at {file}.'
+            assert subdirs.pop(file) == Accessibility.SPECIAL, msg
+        elif not file.is_symlink() and file.is_dir():
+            subdir_access = subdirs.pop(file, accessibility)
+            if is_empty_dir(file):
+                # Whenever we got an empty directory, we check the permission
+                # constraints on the current directory (except if specified
+                # explicitly in subdirs) because for example if we're non-root
+                # (the constraints of the current directory are thus
+                # Accessibility.READABLE), we really have to make sure that
+                # empty directories are *never* writable.
+                subdir_access.assert_on(file)
+            else:
+                _assert_permissions_in_directory(file, subdir_access, subdirs)
+        else:
+            subdirs.pop(file, accessibility).assert_on(file)
+
+
+def assert_permissions(subdirs: dict[str, Accessibility]) -> None:
+    """
+    Recursively check whether the file system conforms to the accessibility
+    specification we specified via 'subdirs'.
+    """
+    root = Path('/')
+    absolute_subdirs = {root / p: a for p, a in subdirs.items()}
+    _assert_permissions_in_directory(
+        root,
+        Accessibility.WRITABLE if os.getuid() == 0 else Accessibility.READABLE,
+        absolute_subdirs,
+    )
+    for file in absolute_subdirs.keys():
+        msg = f'Expected {file} to exist, but it was nowwhere to be found.'
+        raise AssertionError(msg)
diff --git a/nixos/tests/systemd-confinement/default.nix b/nixos/tests/systemd-confinement/default.nix
index 815cb3a3bbdda..52e5f1620cb92 100644
--- a/nixos/tests/systemd-confinement/default.nix
+++ b/nixos/tests/systemd-confinement/default.nix
@@ -2,19 +2,41 @@ import ../make-test-python.nix {
   name = "systemd-confinement";
 
   nodes.machine = { pkgs, lib, ... }: let
-    testServer = pkgs.writeScript "testserver.sh" ''
-      #!${pkgs.runtimeShell}
-      export PATH=${lib.makeBinPath [ pkgs.coreutils pkgs.findutils ]}
-      ${lib.escapeShellArg pkgs.runtimeShell} 2>&1
-      echo "exit-status:$?"
-    '';
+    testLib = pkgs.python3Packages.buildPythonPackage {
+      name = "confinement-testlib";
+      unpackPhase = ''
+        cat > setup.py <<EOF
+        from setuptools import setup
+        setup(name='confinement-testlib', py_modules=["checkperms"])
+        EOF
+        cp ${./checkperms.py} checkperms.py
+      '';
+    };
+
+    mkTest = name: testScript: pkgs.writers.writePython3 "${name}.py" {
+      libraries = [ pkgs.python3Packages.pytest testLib ];
+    } ''
+      # This runs our test script by using pytest's assertion rewriting, so
+      # that whenever we use "assert <something>", the actual values are
+      # printed rather than getting a generic AssertionError or the need to
+      # pass an explicit assertion error message.
+      import ast
+      from pathlib import Path
+      from _pytest.assertion.rewrite import rewrite_asserts
 
-    testClient = pkgs.writeScriptBin "chroot-exec" ''
-      #!${pkgs.runtimeShell} -e
-      output="$(echo "$@" | nc -NU "/run/test$(< /teststep).sock")"
-      ret="$(echo "$output" | sed -nre '$s/^exit-status:([0-9]+)$/\1/p')"
-      echo "$output" | head -n -1
-      exit "''${ret:-1}"
+      script = Path('${pkgs.writeText "${name}-main.py" ''
+        import errno, os, pytest, signal
+        from subprocess import run
+        from checkperms import Accessibility, assert_permissions
+
+        ${testScript}
+      ''}') # noqa
+      filename = str(script)
+      source = script.read_bytes()
+
+      tree = ast.parse(source, filename=filename)
+      rewrite_asserts(tree, source, filename)
+      exec(compile(tree, filename, 'exec', dont_inherit=True))
     '';
 
     mkTestStep = num: {
@@ -22,28 +44,23 @@ import ../make-test-python.nix {
       testScript,
       config ? {},
       serviceName ? "test${toString num}",
+      rawUnit ? null,
     }: {
-      systemd.sockets.${serviceName} = {
-        description = "Socket for Test Service ${toString num}";
-        wantedBy = [ "sockets.target" ];
-        socketConfig.ListenStream = "/run/test${toString num}.sock";
-        socketConfig.Accept = true;
-      };
-
-      systemd.services."${serviceName}@" = {
-        description = "Confined Test Service ${toString num}";
+      systemd.packages = lib.optional (rawUnit != null) (pkgs.writeTextFile {
+        name = serviceName;
+        destination = "/etc/systemd/system/${serviceName}.service";
+        text = rawUnit;
+      });
+
+      systemd.services.${serviceName} = {
+        inherit description;
+        requiredBy = [ "multi-user.target" ];
         confinement = (config.confinement or {}) // { enable = true; };
         serviceConfig = (config.serviceConfig or {}) // {
-          ExecStart = testServer;
-          StandardInput = "socket";
+          ExecStart = mkTest serviceName testScript;
+          Type = "oneshot";
         };
       } // removeAttrs config [ "confinement" "serviceConfig" ];
-
-      __testSteps = lib.mkOrder num ''
-        with subtest('${lib.escape ["'" "\\"] description}'):
-          machine.succeed("echo ${toString num} > /teststep")
-          ${lib.replaceStrings ["\n"] ["\n  "] testScript}
-      '';
     };
 
   in {
@@ -51,82 +68,90 @@ import ../make-test-python.nix {
       { description = "chroot-only confinement";
         config.confinement.mode = "chroot-only";
         testScript = ''
-          # chroot-exec starts a socket-activated service,
-          # but, upon starting, a systemd system service
-          # calls setup_namespace() which calls base_filesystem_create()
-          # which creates some usual top level directories.
-          # In chroot-only mode, without additional BindPaths= or the like,
-          # they must be empty and thus removable by rmdir.
-          paths = machine.succeed('chroot-exec rmdir /dev /etc /proc /root /sys /usr /var "&&" ls -Am /').strip()
-          assert_eq(paths, "bin, nix, run")
-          uid = machine.succeed('chroot-exec id -u').strip()
-          assert_eq(uid, "0")
-          machine.succeed("chroot-exec chown 65534 /bin")
+          assert_permissions({
+            'bin': Accessibility.WRITABLE,
+            'nix': Accessibility.WRITABLE,
+            'run': Accessibility.WRITABLE,
+          })
+
+          assert os.getuid() == 0
+          os.chown('/bin', 65534, 0)
         '';
       }
       { description = "full confinement with APIVFS";
         testScript = ''
-          machine.succeed('chroot-exec rmdir /etc')
-          machine.fail("chroot-exec chown 65534 /bin")
-          assert_eq(machine.succeed('chroot-exec id -u').strip(), "0")
-          machine.succeed("chroot-exec chown 0 /bin")
+          Path('/etc').rmdir()
+
+          assert_permissions({
+            'bin': Accessibility.WRITABLE,
+            'nix': Accessibility.WRITABLE,
+            'tmp': Accessibility.WRITABLE,
+            'run': Accessibility.WRITABLE,
+
+            'proc': Accessibility.SPECIAL,
+            'sys': Accessibility.SPECIAL,
+            'dev': Accessibility.WRITABLE,
+          })
+
+          bin_gid = Path('/bin').stat().st_gid
+          with pytest.raises(OSError) as excinfo:
+            os.chown('/bin', 65534, bin_gid)
+          assert excinfo.value.errno == errno.EINVAL
+
+          assert os.getuid() == 0
+          os.chown('/bin', 0, 0)
         '';
       }
       { description = "check existence of bind-mounted /etc";
         config.serviceConfig.BindReadOnlyPaths = [ "/etc" ];
         testScript = ''
-          passwd = machine.succeed('chroot-exec cat /etc/passwd').strip()
-          assert len(passwd) > 0, "/etc/passwd must not be empty"
+          assert Path('/etc/passwd').read_text()
         '';
       }
       { description = "check if User/Group really runs as non-root";
         config.serviceConfig.User = "chroot-testuser";
         config.serviceConfig.Group = "chroot-testgroup";
         testScript = ''
-          machine.succeed("chroot-exec ls -l /dev")
-          uid = machine.succeed('chroot-exec id -u').strip()
-          assert uid != "0", "UID of chroot-testuser shouldn't be 0"
-          machine.fail("chroot-exec touch /bin/test")
+          assert list(Path('/dev').iterdir())
+
+          assert os.getuid() != 0
+          assert os.getgid() != 0
+
+          with pytest.raises(PermissionError):
+            Path('/bin/test').touch()
         '';
       }
       { description = "check if DynamicUser is working in full-apivfs mode";
         config.confinement.mode = "full-apivfs";
         config.serviceConfig.DynamicUser = true;
         testScript = ''
-          machine.succeed("chroot-exec ls -l /dev")
-          paths = machine.succeed('chroot-exec find / -path /dev/"\\*" -prune -o -path /nix/"\\*" -prune -o -path /proc/"\\*" -prune -o -path /sys/"\\*" -prune -o -print || test $? = 1')
-          assert_eq(
-            '\n'.join(sorted(paths.split('\n'))),
-            dedent("""
-              /
-              /bin
-              /bin/sh
-              /dev
-              /etc
-              /nix
-              /proc
-              /root
-              /run
-              /run/host
-              /run/host/.os-release-stage
-              /run/host/.os-release-stage/os-release
-              /run/host/os-release
-              /run/systemd
-              /run/systemd/incoming
-              /sys
-              /tmp
-              /usr
-              /var
-              /var/tmp
-              find: '/root': Permission denied
-              find: '/run/systemd/incoming': Permission denied
-            """).rstrip()
-          )
-          uid = machine.succeed('chroot-exec id -u').strip()
-          assert uid != "0", "UID of a DynamicUser shouldn't be 0"
-          machine.fail("chroot-exec touch /bin/test")
-          # DynamicUser=true implies ProtectSystem=strict
-          machine.fail("chroot-exec touch /etc/test")
+          assert_permissions({
+            'bin': Accessibility.READABLE,
+            'nix': Accessibility.READABLE,
+            'tmp': Accessibility.WRITABLE,
+            'run': Accessibility.STICKY,
+
+            'proc': Accessibility.SPECIAL,
+            'sys': Accessibility.SPECIAL,
+
+            'dev': Accessibility.SPECIAL,
+            'dev/shm': Accessibility.STICKY,
+            'dev/mqueue': Accessibility.STICKY,
+
+            'var': Accessibility.READABLE,
+            'var/tmp': Accessibility.WRITABLE,
+          })
+
+          assert os.getuid() != 0
+          assert os.getgid() != 0
+
+          with pytest.raises(OSError) as excinfo:
+            Path('/bin/test').touch()
+          assert excinfo.value.errno == errno.EROFS
+
+          with pytest.raises(OSError) as excinfo:
+            Path('/etc/test').touch()
+          assert excinfo.value.errno == errno.EROFS
         '';
       }
       { description = "check if DynamicUser and PrivateTmp=false are working in full-apivfs mode";
@@ -134,69 +159,47 @@ import ../make-test-python.nix {
         config.serviceConfig.DynamicUser = true;
         config.serviceConfig.PrivateTmp = false;
         testScript = ''
-          machine.succeed("chroot-exec ls -l /dev")
-          paths = machine.succeed('chroot-exec find / -path /dev/"\\*" -prune -o -path /nix/"\\*" -prune -o -path /proc/"\\*" -prune -o -path /sys/"\\*" -prune -o -print || test $? = 1')
-          assert_eq(
-            '\n'.join(sorted(paths.split('\n'))),
-            dedent("""
-              /
-              /bin
-              /bin/sh
-              /dev
-              /etc
-              /nix
-              /proc
-              /root
-              /run
-              /run/host
-              /run/host/.os-release-stage
-              /run/host/.os-release-stage/os-release
-              /run/host/os-release
-              /run/systemd
-              /run/systemd/incoming
-              /sys
-              /usr
-              /var
-              find: '/root': Permission denied
-              find: '/run/systemd/incoming': Permission denied
-            """).rstrip()
-          )
-          uid = machine.succeed('chroot-exec id -u').strip()
-          assert uid != "0", "UID of a DynamicUser shouldn't be 0"
-          machine.fail("chroot-exec touch /bin/test")
-          # DynamicUser=true implies ProtectSystem=strict
-          machine.fail("chroot-exec touch /etc/test")
+          assert_permissions({
+            'bin': Accessibility.READABLE,
+            'nix': Accessibility.READABLE,
+            'run': Accessibility.STICKY,
+
+            'proc': Accessibility.SPECIAL,
+            'sys': Accessibility.SPECIAL,
+
+            'dev': Accessibility.SPECIAL,
+            'dev/shm': Accessibility.STICKY,
+            'dev/mqueue': Accessibility.STICKY,
+          })
+
+          assert os.getuid() != 0
+          assert os.getgid() != 0
+
+          with pytest.raises(OSError) as excinfo:
+            Path('/bin/test').touch()
+          assert excinfo.value.errno == errno.EROFS
+
+          with pytest.raises(OSError) as excinfo:
+            Path('/etc/test').touch()
+          assert excinfo.value.errno == errno.EROFS
         '';
       }
       { description = "check if DynamicUser is working in chroot-only mode";
         config.confinement.mode = "chroot-only";
         config.serviceConfig.DynamicUser = true;
         testScript = ''
-          paths = machine.succeed('chroot-exec find / -path /nix/"\\*" -prune -o -print || test $? = 1')
-          assert_eq(
-            '\n'.join(sorted(paths.split('\n'))),
-            dedent("""
-              /
-              /bin
-              /bin/sh
-              /dev
-              /etc
-              /nix
-              /proc
-              /root
-              /run
-              /run/systemd
-              /run/systemd/incoming
-              /sys
-              /usr
-              /var
-              find: '/root': Permission denied
-              find: '/run/systemd/incoming': Permission denied
-            """).rstrip()
-          )
-          uid = machine.succeed('chroot-exec id -u').strip()
-          assert uid != "0", "UID of a DynamicUser shouldn't be 0"
-          machine.fail("chroot-exec touch /bin/test")
+          assert_permissions({
+            'bin': Accessibility.READABLE,
+            'nix': Accessibility.READABLE,
+            'run': Accessibility.READABLE,
+          })
+
+          assert os.getuid() != 0
+          assert os.getgid() != 0
+
+          with pytest.raises(OSError) as excinfo:
+            Path('/bin/test').touch()
+          assert excinfo.value.errno == errno.EROFS
         '';
       }
       { description = "check if DynamicUser and PrivateTmp=true are working in chroot-only mode";
@@ -204,124 +207,119 @@ import ../make-test-python.nix {
         config.serviceConfig.DynamicUser = true;
         config.serviceConfig.PrivateTmp = true;
         testScript = ''
-          paths = machine.succeed('chroot-exec find / -path /nix/"\\*" -prune -o -print || test $? = 1')
-          assert_eq(
-            '\n'.join(sorted(paths.split('\n'))),
-            dedent("""
-              /
-              /bin
-              /bin/sh
-              /dev
-              /etc
-              /nix
-              /proc
-              /root
-              /run
-              /run/systemd
-              /run/systemd/incoming
-              /sys
-              /tmp
-              /usr
-              /var
-              /var/tmp
-              find: '/root': Permission denied
-              find: '/run/systemd/incoming': Permission denied
-            """).rstrip()
-          )
-          uid = machine.succeed('chroot-exec id -u').strip()
-          assert uid != "0", "UID of a DynamicUser shouldn't be 0"
-          machine.fail("chroot-exec touch /bin/test")
+          assert_permissions({
+            'bin': Accessibility.READABLE,
+            'nix': Accessibility.READABLE,
+            'run': Accessibility.READABLE,
+            'tmp': Accessibility.WRITABLE,
+
+            'var': Accessibility.READABLE,
+            'var/tmp': Accessibility.WRITABLE,
+          })
+
+          assert os.getuid() != 0
+          assert os.getgid() != 0
+
+          with pytest.raises(OSError) as excinfo:
+            Path('/bin/test').touch()
+          assert excinfo.value.errno == errno.EROFS
         '';
       }
       (let
         symlink = pkgs.runCommand "symlink" {
-          target = pkgs.writeText "symlink-target" "got me\n";
+          target = pkgs.writeText "symlink-target" "got me";
         } "ln -s \"$target\" \"$out\"";
       in {
         description = "check if symlinks are properly bind-mounted";
         config.confinement.packages = lib.singleton symlink;
         testScript = ''
-          machine.succeed("chroot-exec rmdir /etc")
-          text = machine.succeed('chroot-exec cat ${symlink}').strip()
-          assert_eq(text, "got me")
+          Path('/etc').rmdir()
+          assert Path('${symlink}').read_text() == 'got me'
         '';
       })
       { description = "check if StateDirectory works";
         config.serviceConfig.User = "chroot-testuser";
         config.serviceConfig.Group = "chroot-testgroup";
         config.serviceConfig.StateDirectory = "testme";
+
+        # We restart on purpose here since we want to check whether the state
+        # directory actually persists.
+        config.serviceConfig.Restart = "on-failure";
+        config.serviceConfig.RestartMode = "direct";
+
         testScript = ''
-          machine.succeed("chroot-exec touch /tmp/canary")
-          machine.succeed('chroot-exec "echo works > /var/lib/testme/foo"')
-          machine.succeed('test "$(< /var/lib/testme/foo)" = works')
-          machine.succeed("test ! -e /tmp/canary")
+          assert not Path('/tmp/canary').exists()
+          Path('/tmp/canary').touch()
+
+          if (foo := Path('/var/lib/testme/foo')).exists():
+            assert Path('/var/lib/testme/foo').read_text() == 'works'
+          else:
+            Path('/var/lib/testme/foo').write_text('works')
+            print('<4>Exiting with failure to check persistence on restart.')
+            raise SystemExit(1)
         '';
       }
       { description = "check if /bin/sh works";
         testScript = ''
-          machine.succeed(
-            "chroot-exec test -e /bin/sh",
-            'test "$(chroot-exec \'/bin/sh -c "echo bar"\')" = bar',
+          assert Path('/bin/sh').exists()
+
+          result = run(
+            ['/bin/sh', '-c', 'echo -n bar'],
+            capture_output=True,
+            check=True,
           )
+          assert result.stdout == b'bar'
         '';
       }
       { description = "check if suppressing /bin/sh works";
         config.confinement.binSh = null;
         testScript = ''
-          machine.succeed(
-            'chroot-exec test ! -e /bin/sh',
-            'test "$(chroot-exec \'/bin/sh -c "echo foo"\')" != foo',
-          )
+          assert not Path('/bin/sh').exists()
+          with pytest.raises(FileNotFoundError):
+            run(['/bin/sh', '-c', 'echo foo'])
         '';
       }
       { description = "check if we can set /bin/sh to something different";
         config.confinement.binSh = "${pkgs.hello}/bin/hello";
         testScript = ''
-          machine.succeed("chroot-exec test -e /bin/sh")
-          machine.succeed('test "$(chroot-exec /bin/sh -g foo)" = foo')
+          assert Path('/bin/sh').exists()
+          result = run(
+            ['/bin/sh', '-g', 'foo'],
+            capture_output=True,
+            check=True,
+          )
+          assert result.stdout == b'foo\n'
         '';
       }
       { description = "check if only Exec* dependencies are included";
-        config.environment.FOOBAR = pkgs.writeText "foobar" "eek\n";
+        config.environment.FOOBAR = pkgs.writeText "foobar" "eek";
         testScript = ''
-          machine.succeed('test "$(chroot-exec \'cat "$FOOBAR"\')" != eek')
+          with pytest.raises(FileNotFoundError):
+            Path(os.environ['FOOBAR']).read_text()
         '';
       }
-      { description = "check if all unit dependencies are included";
-        config.environment.FOOBAR = pkgs.writeText "foobar" "eek\n";
+      { description = "check if fullUnit includes all dependencies";
+        config.environment.FOOBAR = pkgs.writeText "foobar" "eek";
         config.confinement.fullUnit = true;
         testScript = ''
-          machine.succeed('test "$(chroot-exec \'cat "$FOOBAR"\')" = eek')
+          assert Path(os.environ['FOOBAR']).read_text() == 'eek'
         '';
       }
       { description = "check if shipped unit file still works";
-        serviceName = "shipped-unitfile";
         config.confinement.mode = "chroot-only";
+        rawUnit = ''
+          [Service]
+          SystemCallFilter=~kill
+          SystemCallErrorNumber=ELOOP
+        '';
         testScript = ''
-          machine.succeed(
-            'chroot-exec \'kill -9 $$ 2>&1 || :\' | '
-            'grep -q "Too many levels of symbolic links"'
-          )
+          with pytest.raises(OSError) as excinfo:
+            os.kill(os.getpid(), signal.SIGKILL)
+          assert excinfo.value.errno == errno.ELOOP
         '';
       }
     ];
 
-    options.__testSteps = lib.mkOption {
-      type = lib.types.lines;
-      description = "All of the test steps combined as a single script.";
-    };
-
-    config.environment.systemPackages = lib.singleton testClient;
-    config.systemd.packages = lib.singleton (pkgs.writeTextFile {
-      name = "shipped-unitfile";
-      destination = "/etc/systemd/system/shipped-unitfile@.service";
-      text = ''
-        [Service]
-        SystemCallFilter=~kill
-        SystemCallErrorNumber=ELOOP
-      '';
-    });
-
     config.users.groups.chroot-testgroup = {};
     config.users.users.chroot-testuser = {
       isSystemUser = true;
@@ -330,16 +328,7 @@ import ../make-test-python.nix {
     };
   };
 
-  testScript = { nodes, ... }: ''
-    from textwrap import dedent
-    import difflib
-
-    def assert_eq(got, expected):
-      if got != expected:
-        diff = difflib.unified_diff(got.splitlines(keepends=True), expected.splitlines(keepends=True))
-        print("".join(diff))
-      assert got == expected, f"{got} != {expected}"
-
+  testScript = ''
     machine.wait_for_unit("multi-user.target")
-  '' + nodes.machine.__testSteps;
+  '';
 }