about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--nixos/tests/systemd-confinement/checkperms.py187
-rw-r--r--nixos/tests/systemd-confinement/default.nix433
2 files changed, 398 insertions, 222 deletions
diff --git a/nixos/tests/systemd-confinement/checkperms.py b/nixos/tests/systemd-confinement/checkperms.py
new file mode 100644
index 0000000000000..3c7ba279a3d20
--- /dev/null
+++ b/nixos/tests/systemd-confinement/checkperms.py
@@ -0,0 +1,187 @@
+import errno
+import os
+
+from enum import IntEnum
+from pathlib import Path
+
+
+class Accessibility(IntEnum):
+    """
+    The level of accessibility we have on a file or directory.
+
+    This is needed to assess the attack surface on the file system namespace we
+    have within a confined service. Higher levels mean more permissions for the
+    user and thus a bigger attack surface.
+    """
+    NONE = 0
+
+    # Directories can be listed or files can be read.
+    READABLE = 1
+
+    # This is for special file systems such as procfs and for stuff such as
+    # FIFOs or character special files. The reason why this has a lower value
+    # than WRITABLE is because those files are more restricted on what and how
+    # they can be written to.
+    SPECIAL = 2
+
+    # Another special case are sticky directories, which do allow write access
+    # but restrict deletion. This does *not* apply to sticky directories that
+    # are read-only.
+    STICKY = 3
+
+    # Essentially full permissions, the kind of accessibility we want to avoid
+    # in most cases.
+    WRITABLE = 4
+
+    def assert_on(self, path: Path) -> None:
+        """
+        Raise an AssertionError if the given 'path' allows for more
+        accessibility than 'self'.
+        """
+        actual = self.NONE
+
+        if path.is_symlink():
+            actual = self.READABLE
+        elif path.is_dir():
+            writable = True
+
+            dummy_file = path / 'can_i_write'
+            try:
+                dummy_file.touch()
+            except OSError as e:
+                if e.errno in [errno.EROFS, errno.EACCES]:
+                    writable = False
+                else:
+                    raise
+            else:
+                dummy_file.unlink()
+
+            if writable:
+                # The reason why we test this *after* we made sure it's
+                # writable is because we could have a sticky directory where
+                # the current user doesn't have write access.
+                if path.stat().st_mode & 0o1000 == 0o1000:
+                    actual = self.STICKY
+                else:
+                    actual = self.WRITABLE
+            else:
+                actual = self.READABLE
+        elif path.is_file():
+            try:
+                with path.open('rb') as fp:
+                    fp.read(1)
+                actual = self.READABLE
+            except PermissionError:
+                pass
+
+            writable = True
+            try:
+                with path.open('ab') as fp:
+                    fp.write('x')
+                    size = fp.tell()
+                    fp.truncate(size)
+            except PermissionError:
+                writable = False
+            except OSError as e:
+                if e.errno == errno.ETXTBSY:
+                    writable = os.access(path, os.W_OK)
+                elif e.errno == errno.EROFS:
+                    writable = False
+                else:
+                    raise
+
+            # Let's always try to fail towards being writable, so if *either*
+            # access(2) or a real write is successful it's writable. This is to
+            # make sure we don't accidentally introduce no-ops if we have bugs
+            # in the more complicated real write code above.
+            if writable or os.access(path, os.W_OK):
+                actual = self.WRITABLE
+        else:
+            # We need to be very careful when writing to or reading from
+            # special files (eg.  FIFOs), since they can possibly block. So if
+            # it's not a file, just trust that access(2) won't lie.
+            if os.access(path, os.R_OK):
+                actual = self.READABLE
+
+            if os.access(path, os.W_OK):
+                actual = self.SPECIAL
+
+        if actual > self:
+            stat = path.stat()
+            details = ', '.join([
+                f'permissions: {stat.st_mode & 0o7777:o}',
+                f'uid: {stat.st_uid}',
+                f'group: {stat.st_gid}',
+            ])
+
+            raise AssertionError(
+                f'Expected at most {self!r} but got {actual!r} for path'
+                f' {path} ({details}).'
+            )
+
+
+def is_special_fs(path: Path) -> bool:
+    """
+    Check whether the given path truly is a special file system such as procfs
+    or sysfs.
+    """
+    try:
+        if path == Path('/proc'):
+            return (path / 'version').read_text().startswith('Linux')
+        elif path == Path('/sys'):
+            return b'Linux' in (path / 'kernel' / 'notes').read_bytes()
+    except FileNotFoundError:
+        pass
+    return False
+
+
+def is_empty_dir(path: Path) -> bool:
+    try:
+        next(path.iterdir())
+        return False
+    except (StopIteration, PermissionError):
+        return True
+
+
+def _assert_permissions_in_directory(
+    directory: Path,
+    accessibility: Accessibility,
+    subdirs: dict[Path, Accessibility],
+) -> None:
+    accessibility.assert_on(directory)
+
+    for file in directory.iterdir():
+        if is_special_fs(file):
+            msg = f'Got unexpected special filesystem at {file}.'
+            assert subdirs.pop(file) == Accessibility.SPECIAL, msg
+        elif not file.is_symlink() and file.is_dir():
+            subdir_access = subdirs.pop(file, accessibility)
+            if is_empty_dir(file):
+                # Whenever we got an empty directory, we check the permission
+                # constraints on the current directory (except if specified
+                # explicitly in subdirs) because for example if we're non-root
+                # (the constraints of the current directory are thus
+                # Accessibility.READABLE), we really have to make sure that
+                # empty directories are *never* writable.
+                subdir_access.assert_on(file)
+            else:
+                _assert_permissions_in_directory(file, subdir_access, subdirs)
+        else:
+            subdirs.pop(file, accessibility).assert_on(file)
+
+
+def assert_permissions(subdirs: dict[str, Accessibility]) -> None:
+    """
+    Recursively check whether the file system conforms to the accessibility
+    specification we specified via 'subdirs'.
+    """
+    root = Path('/')
+    absolute_subdirs = {root / p: a for p, a in subdirs.items()}
+    _assert_permissions_in_directory(
+        root,
+        Accessibility.WRITABLE if os.getuid() == 0 else Accessibility.READABLE,
+        absolute_subdirs,
+    )
+    for file in absolute_subdirs.keys():
+        msg = f'Expected {file} to exist, but it was nowwhere to be found.'
+        raise AssertionError(msg)
diff --git a/nixos/tests/systemd-confinement/default.nix b/nixos/tests/systemd-confinement/default.nix
index 815cb3a3bbdda..52e5f1620cb92 100644
--- a/nixos/tests/systemd-confinement/default.nix
+++ b/nixos/tests/systemd-confinement/default.nix
@@ -2,19 +2,41 @@ import ../make-test-python.nix {
   name = "systemd-confinement";
 
   nodes.machine = { pkgs, lib, ... }: let
-    testServer = pkgs.writeScript "testserver.sh" ''
-      #!${pkgs.runtimeShell}
-      export PATH=${lib.makeBinPath [ pkgs.coreutils pkgs.findutils ]}
-      ${lib.escapeShellArg pkgs.runtimeShell} 2>&1
-      echo "exit-status:$?"
-    '';
+    testLib = pkgs.python3Packages.buildPythonPackage {
+      name = "confinement-testlib";
+      unpackPhase = ''
+        cat > setup.py <<EOF
+        from setuptools import setup
+        setup(name='confinement-testlib', py_modules=["checkperms"])
+        EOF
+        cp ${./checkperms.py} checkperms.py
+      '';
+    };
+
+    mkTest = name: testScript: pkgs.writers.writePython3 "${name}.py" {
+      libraries = [ pkgs.python3Packages.pytest testLib ];
+    } ''
+      # This runs our test script by using pytest's assertion rewriting, so
+      # that whenever we use "assert <something>", the actual values are
+      # printed rather than getting a generic AssertionError or the need to
+      # pass an explicit assertion error message.
+      import ast
+      from pathlib import Path
+      from _pytest.assertion.rewrite import rewrite_asserts
 
-    testClient = pkgs.writeScriptBin "chroot-exec" ''
-      #!${pkgs.runtimeShell} -e
-      output="$(echo "$@" | nc -NU "/run/test$(< /teststep).sock")"
-      ret="$(echo "$output" | sed -nre '$s/^exit-status:([0-9]+)$/\1/p')"
-      echo "$output" | head -n -1
-      exit "''${ret:-1}"
+      script = Path('${pkgs.writeText "${name}-main.py" ''
+        import errno, os, pytest, signal
+        from subprocess import run
+        from checkperms import Accessibility, assert_permissions
+
+        ${testScript}
+      ''}') # noqa
+      filename = str(script)
+      source = script.read_bytes()
+
+      tree = ast.parse(source, filename=filename)
+      rewrite_asserts(tree, source, filename)
+      exec(compile(tree, filename, 'exec', dont_inherit=True))
     '';
 
     mkTestStep = num: {
@@ -22,28 +44,23 @@ import ../make-test-python.nix {
       testScript,
       config ? {},
       serviceName ? "test${toString num}",
+      rawUnit ? null,
     }: {
-      systemd.sockets.${serviceName} = {
-        description = "Socket for Test Service ${toString num}";
-        wantedBy = [ "sockets.target" ];
-        socketConfig.ListenStream = "/run/test${toString num}.sock";
-        socketConfig.Accept = true;
-      };
-
-      systemd.services."${serviceName}@" = {
-        description = "Confined Test Service ${toString num}";
+      systemd.packages = lib.optional (rawUnit != null) (pkgs.writeTextFile {
+        name = serviceName;
+        destination = "/etc/systemd/system/${serviceName}.service";
+        text = rawUnit;
+      });
+
+      systemd.services.${serviceName} = {
+        inherit description;
+        requiredBy = [ "multi-user.target" ];
         confinement = (config.confinement or {}) // { enable = true; };
         serviceConfig = (config.serviceConfig or {}) // {
-          ExecStart = testServer;
-          StandardInput = "socket";
+          ExecStart = mkTest serviceName testScript;
+          Type = "oneshot";
         };
       } // removeAttrs config [ "confinement" "serviceConfig" ];
-
-      __testSteps = lib.mkOrder num ''
-        with subtest('${lib.escape ["'" "\\"] description}'):
-          machine.succeed("echo ${toString num} > /teststep")
-          ${lib.replaceStrings ["\n"] ["\n  "] testScript}
-      '';
     };
 
   in {
@@ -51,82 +68,90 @@ import ../make-test-python.nix {
       { description = "chroot-only confinement";
         config.confinement.mode = "chroot-only";
         testScript = ''
-          # chroot-exec starts a socket-activated service,
-          # but, upon starting, a systemd system service
-          # calls setup_namespace() which calls base_filesystem_create()
-          # which creates some usual top level directories.
-          # In chroot-only mode, without additional BindPaths= or the like,
-          # they must be empty and thus removable by rmdir.
-          paths = machine.succeed('chroot-exec rmdir /dev /etc /proc /root /sys /usr /var "&&" ls -Am /').strip()
-          assert_eq(paths, "bin, nix, run")
-          uid = machine.succeed('chroot-exec id -u').strip()
-          assert_eq(uid, "0")
-          machine.succeed("chroot-exec chown 65534 /bin")
+          assert_permissions({
+            'bin': Accessibility.WRITABLE,
+            'nix': Accessibility.WRITABLE,
+            'run': Accessibility.WRITABLE,
+          })
+
+          assert os.getuid() == 0
+          os.chown('/bin', 65534, 0)
         '';
       }
       { description = "full confinement with APIVFS";
         testScript = ''
-          machine.succeed('chroot-exec rmdir /etc')
-          machine.fail("chroot-exec chown 65534 /bin")
-          assert_eq(machine.succeed('chroot-exec id -u').strip(), "0")
-          machine.succeed("chroot-exec chown 0 /bin")
+          Path('/etc').rmdir()
+
+          assert_permissions({
+            'bin': Accessibility.WRITABLE,
+            'nix': Accessibility.WRITABLE,
+            'tmp': Accessibility.WRITABLE,
+            'run': Accessibility.WRITABLE,
+
+            'proc': Accessibility.SPECIAL,
+            'sys': Accessibility.SPECIAL,
+            'dev': Accessibility.WRITABLE,
+          })
+
+          bin_gid = Path('/bin').stat().st_gid
+          with pytest.raises(OSError) as excinfo:
+            os.chown('/bin', 65534, bin_gid)
+          assert excinfo.value.errno == errno.EINVAL
+
+          assert os.getuid() == 0
+          os.chown('/bin', 0, 0)
         '';
       }
       { description = "check existence of bind-mounted /etc";
         config.serviceConfig.BindReadOnlyPaths = [ "/etc" ];
         testScript = ''
-          passwd = machine.succeed('chroot-exec cat /etc/passwd').strip()
-          assert len(passwd) > 0, "/etc/passwd must not be empty"
+          assert Path('/etc/passwd').read_text()
         '';
       }
       { description = "check if User/Group really runs as non-root";
         config.serviceConfig.User = "chroot-testuser";
         config.serviceConfig.Group = "chroot-testgroup";
         testScript = ''
-          machine.succeed("chroot-exec ls -l /dev")
-          uid = machine.succeed('chroot-exec id -u').strip()
-          assert uid != "0", "UID of chroot-testuser shouldn't be 0"
-          machine.fail("chroot-exec touch /bin/test")
+          assert list(Path('/dev').iterdir())
+
+          assert os.getuid() != 0
+          assert os.getgid() != 0
+
+          with pytest.raises(PermissionError):
+            Path('/bin/test').touch()
         '';
       }
       { description = "check if DynamicUser is working in full-apivfs mode";
         config.confinement.mode = "full-apivfs";
         config.serviceConfig.DynamicUser = true;
         testScript = ''
-          machine.succeed("chroot-exec ls -l /dev")
-          paths = machine.succeed('chroot-exec find / -path /dev/"\\*" -prune -o -path /nix/"\\*" -prune -o -path /proc/"\\*" -prune -o -path /sys/"\\*" -prune -o -print || test $? = 1')
-          assert_eq(
-            '\n'.join(sorted(paths.split('\n'))),
-            dedent("""
-              /
-              /bin
-              /bin/sh
-              /dev
-              /etc
-              /nix
-              /proc
-              /root
-              /run
-              /run/host
-              /run/host/.os-release-stage
-              /run/host/.os-release-stage/os-release
-              /run/host/os-release
-              /run/systemd
-              /run/systemd/incoming
-              /sys
-              /tmp
-              /usr
-              /var
-              /var/tmp
-              find: '/root': Permission denied
-              find: '/run/systemd/incoming': Permission denied
-            """).rstrip()
-          )
-          uid = machine.succeed('chroot-exec id -u').strip()
-          assert uid != "0", "UID of a DynamicUser shouldn't be 0"
-          machine.fail("chroot-exec touch /bin/test")
-          # DynamicUser=true implies ProtectSystem=strict
-          machine.fail("chroot-exec touch /etc/test")
+          assert_permissions({
+            'bin': Accessibility.READABLE,
+            'nix': Accessibility.READABLE,
+            'tmp': Accessibility.WRITABLE,
+            'run': Accessibility.STICKY,
+
+            'proc': Accessibility.SPECIAL,
+            'sys': Accessibility.SPECIAL,
+
+            'dev': Accessibility.SPECIAL,
+            'dev/shm': Accessibility.STICKY,
+            'dev/mqueue': Accessibility.STICKY,
+
+            'var': Accessibility.READABLE,
+            'var/tmp': Accessibility.WRITABLE,
+          })
+
+          assert os.getuid() != 0
+          assert os.getgid() != 0
+
+          with pytest.raises(OSError) as excinfo:
+            Path('/bin/test').touch()
+          assert excinfo.value.errno == errno.EROFS
+
+          with pytest.raises(OSError) as excinfo:
+            Path('/etc/test').touch()
+          assert excinfo.value.errno == errno.EROFS
         '';
       }
       { description = "check if DynamicUser and PrivateTmp=false are working in full-apivfs mode";
@@ -134,69 +159,47 @@ import ../make-test-python.nix {
         config.serviceConfig.DynamicUser = true;
         config.serviceConfig.PrivateTmp = false;
         testScript = ''
-          machine.succeed("chroot-exec ls -l /dev")
-          paths = machine.succeed('chroot-exec find / -path /dev/"\\*" -prune -o -path /nix/"\\*" -prune -o -path /proc/"\\*" -prune -o -path /sys/"\\*" -prune -o -print || test $? = 1')
-          assert_eq(
-            '\n'.join(sorted(paths.split('\n'))),
-            dedent("""
-              /
-              /bin
-              /bin/sh
-              /dev
-              /etc
-              /nix
-              /proc
-              /root
-              /run
-              /run/host
-              /run/host/.os-release-stage
-              /run/host/.os-release-stage/os-release
-              /run/host/os-release
-              /run/systemd
-              /run/systemd/incoming
-              /sys
-              /usr
-              /var
-              find: '/root': Permission denied
-              find: '/run/systemd/incoming': Permission denied
-            """).rstrip()
-          )
-          uid = machine.succeed('chroot-exec id -u').strip()
-          assert uid != "0", "UID of a DynamicUser shouldn't be 0"
-          machine.fail("chroot-exec touch /bin/test")
-          # DynamicUser=true implies ProtectSystem=strict
-          machine.fail("chroot-exec touch /etc/test")
+          assert_permissions({
+            'bin': Accessibility.READABLE,
+            'nix': Accessibility.READABLE,
+            'run': Accessibility.STICKY,
+
+            'proc': Accessibility.SPECIAL,
+            'sys': Accessibility.SPECIAL,
+
+            'dev': Accessibility.SPECIAL,
+            'dev/shm': Accessibility.STICKY,
+            'dev/mqueue': Accessibility.STICKY,
+          })
+
+          assert os.getuid() != 0
+          assert os.getgid() != 0
+
+          with pytest.raises(OSError) as excinfo:
+            Path('/bin/test').touch()
+          assert excinfo.value.errno == errno.EROFS
+
+          with pytest.raises(OSError) as excinfo:
+            Path('/etc/test').touch()
+          assert excinfo.value.errno == errno.EROFS
         '';
       }
       { description = "check if DynamicUser is working in chroot-only mode";
         config.confinement.mode = "chroot-only";
         config.serviceConfig.DynamicUser = true;
         testScript = ''
-          paths = machine.succeed('chroot-exec find / -path /nix/"\\*" -prune -o -print || test $? = 1')
-          assert_eq(
-            '\n'.join(sorted(paths.split('\n'))),
-            dedent("""
-              /
-              /bin
-              /bin/sh
-              /dev
-              /etc
-              /nix
-              /proc
-              /root
-              /run
-              /run/systemd
-              /run/systemd/incoming
-              /sys
-              /usr
-              /var
-              find: '/root': Permission denied
-              find: '/run/systemd/incoming': Permission denied
-            """).rstrip()
-          )
-          uid = machine.succeed('chroot-exec id -u').strip()
-          assert uid != "0", "UID of a DynamicUser shouldn't be 0"
-          machine.fail("chroot-exec touch /bin/test")
+          assert_permissions({
+            'bin': Accessibility.READABLE,
+            'nix': Accessibility.READABLE,
+            'run': Accessibility.READABLE,
+          })
+
+          assert os.getuid() != 0
+          assert os.getgid() != 0
+
+          with pytest.raises(OSError) as excinfo:
+            Path('/bin/test').touch()
+          assert excinfo.value.errno == errno.EROFS
         '';
       }
       { description = "check if DynamicUser and PrivateTmp=true are working in chroot-only mode";
@@ -204,124 +207,119 @@ import ../make-test-python.nix {
         config.serviceConfig.DynamicUser = true;
         config.serviceConfig.PrivateTmp = true;
         testScript = ''
-          paths = machine.succeed('chroot-exec find / -path /nix/"\\*" -prune -o -print || test $? = 1')
-          assert_eq(
-            '\n'.join(sorted(paths.split('\n'))),
-            dedent("""
-              /
-              /bin
-              /bin/sh
-              /dev
-              /etc
-              /nix
-              /proc
-              /root
-              /run
-              /run/systemd
-              /run/systemd/incoming
-              /sys
-              /tmp
-              /usr
-              /var
-              /var/tmp
-              find: '/root': Permission denied
-              find: '/run/systemd/incoming': Permission denied
-            """).rstrip()
-          )
-          uid = machine.succeed('chroot-exec id -u').strip()
-          assert uid != "0", "UID of a DynamicUser shouldn't be 0"
-          machine.fail("chroot-exec touch /bin/test")
+          assert_permissions({
+            'bin': Accessibility.READABLE,
+            'nix': Accessibility.READABLE,
+            'run': Accessibility.READABLE,
+            'tmp': Accessibility.WRITABLE,
+
+            'var': Accessibility.READABLE,
+            'var/tmp': Accessibility.WRITABLE,
+          })
+
+          assert os.getuid() != 0
+          assert os.getgid() != 0
+
+          with pytest.raises(OSError) as excinfo:
+            Path('/bin/test').touch()
+          assert excinfo.value.errno == errno.EROFS
         '';
       }
       (let
         symlink = pkgs.runCommand "symlink" {
-          target = pkgs.writeText "symlink-target" "got me\n";
+          target = pkgs.writeText "symlink-target" "got me";
         } "ln -s \"$target\" \"$out\"";
       in {
         description = "check if symlinks are properly bind-mounted";
         config.confinement.packages = lib.singleton symlink;
         testScript = ''
-          machine.succeed("chroot-exec rmdir /etc")
-          text = machine.succeed('chroot-exec cat ${symlink}').strip()
-          assert_eq(text, "got me")
+          Path('/etc').rmdir()
+          assert Path('${symlink}').read_text() == 'got me'
         '';
       })
       { description = "check if StateDirectory works";
         config.serviceConfig.User = "chroot-testuser";
         config.serviceConfig.Group = "chroot-testgroup";
         config.serviceConfig.StateDirectory = "testme";
+
+        # We restart on purpose here since we want to check whether the state
+        # directory actually persists.
+        config.serviceConfig.Restart = "on-failure";
+        config.serviceConfig.RestartMode = "direct";
+
         testScript = ''
-          machine.succeed("chroot-exec touch /tmp/canary")
-          machine.succeed('chroot-exec "echo works > /var/lib/testme/foo"')
-          machine.succeed('test "$(< /var/lib/testme/foo)" = works')
-          machine.succeed("test ! -e /tmp/canary")
+          assert not Path('/tmp/canary').exists()
+          Path('/tmp/canary').touch()
+
+          if (foo := Path('/var/lib/testme/foo')).exists():
+            assert Path('/var/lib/testme/foo').read_text() == 'works'
+          else:
+            Path('/var/lib/testme/foo').write_text('works')
+            print('<4>Exiting with failure to check persistence on restart.')
+            raise SystemExit(1)
         '';
       }
       { description = "check if /bin/sh works";
         testScript = ''
-          machine.succeed(
-            "chroot-exec test -e /bin/sh",
-            'test "$(chroot-exec \'/bin/sh -c "echo bar"\')" = bar',
+          assert Path('/bin/sh').exists()
+
+          result = run(
+            ['/bin/sh', '-c', 'echo -n bar'],
+            capture_output=True,
+            check=True,
           )
+          assert result.stdout == b'bar'
         '';
       }
       { description = "check if suppressing /bin/sh works";
         config.confinement.binSh = null;
         testScript = ''
-          machine.succeed(
-            'chroot-exec test ! -e /bin/sh',
-            'test "$(chroot-exec \'/bin/sh -c "echo foo"\')" != foo',
-          )
+          assert not Path('/bin/sh').exists()
+          with pytest.raises(FileNotFoundError):
+            run(['/bin/sh', '-c', 'echo foo'])
         '';
       }
       { description = "check if we can set /bin/sh to something different";
         config.confinement.binSh = "${pkgs.hello}/bin/hello";
         testScript = ''
-          machine.succeed("chroot-exec test -e /bin/sh")
-          machine.succeed('test "$(chroot-exec /bin/sh -g foo)" = foo')
+          assert Path('/bin/sh').exists()
+          result = run(
+            ['/bin/sh', '-g', 'foo'],
+            capture_output=True,
+            check=True,
+          )
+          assert result.stdout == b'foo\n'
         '';
       }
       { description = "check if only Exec* dependencies are included";
-        config.environment.FOOBAR = pkgs.writeText "foobar" "eek\n";
+        config.environment.FOOBAR = pkgs.writeText "foobar" "eek";
         testScript = ''
-          machine.succeed('test "$(chroot-exec \'cat "$FOOBAR"\')" != eek')
+          with pytest.raises(FileNotFoundError):
+            Path(os.environ['FOOBAR']).read_text()
         '';
       }
-      { description = "check if all unit dependencies are included";
-        config.environment.FOOBAR = pkgs.writeText "foobar" "eek\n";
+      { description = "check if fullUnit includes all dependencies";
+        config.environment.FOOBAR = pkgs.writeText "foobar" "eek";
         config.confinement.fullUnit = true;
         testScript = ''
-          machine.succeed('test "$(chroot-exec \'cat "$FOOBAR"\')" = eek')
+          assert Path(os.environ['FOOBAR']).read_text() == 'eek'
         '';
       }
       { description = "check if shipped unit file still works";
-        serviceName = "shipped-unitfile";
         config.confinement.mode = "chroot-only";
+        rawUnit = ''
+          [Service]
+          SystemCallFilter=~kill
+          SystemCallErrorNumber=ELOOP
+        '';
         testScript = ''
-          machine.succeed(
-            'chroot-exec \'kill -9 $$ 2>&1 || :\' | '
-            'grep -q "Too many levels of symbolic links"'
-          )
+          with pytest.raises(OSError) as excinfo:
+            os.kill(os.getpid(), signal.SIGKILL)
+          assert excinfo.value.errno == errno.ELOOP
         '';
       }
     ];
 
-    options.__testSteps = lib.mkOption {
-      type = lib.types.lines;
-      description = "All of the test steps combined as a single script.";
-    };
-
-    config.environment.systemPackages = lib.singleton testClient;
-    config.systemd.packages = lib.singleton (pkgs.writeTextFile {
-      name = "shipped-unitfile";
-      destination = "/etc/systemd/system/shipped-unitfile@.service";
-      text = ''
-        [Service]
-        SystemCallFilter=~kill
-        SystemCallErrorNumber=ELOOP
-      '';
-    });
-
     config.users.groups.chroot-testgroup = {};
     config.users.users.chroot-testuser = {
       isSystemUser = true;
@@ -330,16 +328,7 @@ import ../make-test-python.nix {
     };
   };
 
-  testScript = { nodes, ... }: ''
-    from textwrap import dedent
-    import difflib
-
-    def assert_eq(got, expected):
-      if got != expected:
-        diff = difflib.unified_diff(got.splitlines(keepends=True), expected.splitlines(keepends=True))
-        print("".join(diff))
-      assert got == expected, f"{got} != {expected}"
-
+  testScript = ''
     machine.wait_for_unit("multi-user.target")
-  '' + nodes.machine.__testSteps;
+  '';
 }