about summary refs log tree commit diff
path: root/nixos/tests/systemd-confinement
diff options
context:
space:
mode:
Diffstat (limited to 'nixos/tests/systemd-confinement')
-rw-r--r--nixos/tests/systemd-confinement/checkperms.py187
-rw-r--r--nixos/tests/systemd-confinement/default.nix274
2 files changed, 461 insertions, 0 deletions
diff --git a/nixos/tests/systemd-confinement/checkperms.py b/nixos/tests/systemd-confinement/checkperms.py
new file mode 100644
index 0000000000000..3c7ba279a3d20
--- /dev/null
+++ b/nixos/tests/systemd-confinement/checkperms.py
@@ -0,0 +1,187 @@
+import errno
+import os
+
+from enum import IntEnum
+from pathlib import Path
+
+
+class Accessibility(IntEnum):
+    """
+    The level of accessibility we have on a file or directory.
+
+    This is needed to assess the attack surface on the file system namespace we
+    have within a confined service. Higher levels mean more permissions for the
+    user and thus a bigger attack surface.
+    """
+    NONE = 0
+
+    # Directories can be listed or files can be read.
+    READABLE = 1
+
+    # This is for special file systems such as procfs and for stuff such as
+    # FIFOs or character special files. The reason why this has a lower value
+    # than WRITABLE is because those files are more restricted on what and how
+    # they can be written to.
+    SPECIAL = 2
+
+    # Another special case are sticky directories, which do allow write access
+    # but restrict deletion. This does *not* apply to sticky directories that
+    # are read-only.
+    STICKY = 3
+
+    # Essentially full permissions, the kind of accessibility we want to avoid
+    # in most cases.
+    WRITABLE = 4
+
+    def assert_on(self, path: Path) -> None:
+        """
+        Raise an AssertionError if the given 'path' allows for more
+        accessibility than 'self'.
+        """
+        actual = self.NONE
+
+        if path.is_symlink():
+            actual = self.READABLE
+        elif path.is_dir():
+            writable = True
+
+            dummy_file = path / 'can_i_write'
+            try:
+                dummy_file.touch()
+            except OSError as e:
+                if e.errno in [errno.EROFS, errno.EACCES]:
+                    writable = False
+                else:
+                    raise
+            else:
+                dummy_file.unlink()
+
+            if writable:
+                # The reason why we test this *after* we made sure it's
+                # writable is because we could have a sticky directory where
+                # the current user doesn't have write access.
+                if path.stat().st_mode & 0o1000 == 0o1000:
+                    actual = self.STICKY
+                else:
+                    actual = self.WRITABLE
+            else:
+                actual = self.READABLE
+        elif path.is_file():
+            try:
+                with path.open('rb') as fp:
+                    fp.read(1)
+                actual = self.READABLE
+            except PermissionError:
+                pass
+
+            writable = True
+            try:
+                with path.open('ab') as fp:
+                    fp.write('x')
+                    size = fp.tell()
+                    fp.truncate(size)
+            except PermissionError:
+                writable = False
+            except OSError as e:
+                if e.errno == errno.ETXTBSY:
+                    writable = os.access(path, os.W_OK)
+                elif e.errno == errno.EROFS:
+                    writable = False
+                else:
+                    raise
+
+            # Let's always try to fail towards being writable, so if *either*
+            # access(2) or a real write is successful it's writable. This is to
+            # make sure we don't accidentally introduce no-ops if we have bugs
+            # in the more complicated real write code above.
+            if writable or os.access(path, os.W_OK):
+                actual = self.WRITABLE
+        else:
+            # We need to be very careful when writing to or reading from
+            # special files (eg.  FIFOs), since they can possibly block. So if
+            # it's not a file, just trust that access(2) won't lie.
+            if os.access(path, os.R_OK):
+                actual = self.READABLE
+
+            if os.access(path, os.W_OK):
+                actual = self.SPECIAL
+
+        if actual > self:
+            stat = path.stat()
+            details = ', '.join([
+                f'permissions: {stat.st_mode & 0o7777:o}',
+                f'uid: {stat.st_uid}',
+                f'group: {stat.st_gid}',
+            ])
+
+            raise AssertionError(
+                f'Expected at most {self!r} but got {actual!r} for path'
+                f' {path} ({details}).'
+            )
+
+
+def is_special_fs(path: Path) -> bool:
+    """
+    Check whether the given path truly is a special file system such as procfs
+    or sysfs.
+    """
+    try:
+        if path == Path('/proc'):
+            return (path / 'version').read_text().startswith('Linux')
+        elif path == Path('/sys'):
+            return b'Linux' in (path / 'kernel' / 'notes').read_bytes()
+    except FileNotFoundError:
+        pass
+    return False
+
+
+def is_empty_dir(path: Path) -> bool:
+    try:
+        next(path.iterdir())
+        return False
+    except (StopIteration, PermissionError):
+        return True
+
+
+def _assert_permissions_in_directory(
+    directory: Path,
+    accessibility: Accessibility,
+    subdirs: dict[Path, Accessibility],
+) -> None:
+    accessibility.assert_on(directory)
+
+    for file in directory.iterdir():
+        if is_special_fs(file):
+            msg = f'Got unexpected special filesystem at {file}.'
+            assert subdirs.pop(file) == Accessibility.SPECIAL, msg
+        elif not file.is_symlink() and file.is_dir():
+            subdir_access = subdirs.pop(file, accessibility)
+            if is_empty_dir(file):
+                # Whenever we got an empty directory, we check the permission
+                # constraints on the current directory (except if specified
+                # explicitly in subdirs) because for example if we're non-root
+                # (the constraints of the current directory are thus
+                # Accessibility.READABLE), we really have to make sure that
+                # empty directories are *never* writable.
+                subdir_access.assert_on(file)
+            else:
+                _assert_permissions_in_directory(file, subdir_access, subdirs)
+        else:
+            subdirs.pop(file, accessibility).assert_on(file)
+
+
+def assert_permissions(subdirs: dict[str, Accessibility]) -> None:
+    """
+    Recursively check whether the file system conforms to the accessibility
+    specification we specified via 'subdirs'.
+    """
+    root = Path('/')
+    absolute_subdirs = {root / p: a for p, a in subdirs.items()}
+    _assert_permissions_in_directory(
+        root,
+        Accessibility.WRITABLE if os.getuid() == 0 else Accessibility.READABLE,
+        absolute_subdirs,
+    )
+    for file in absolute_subdirs.keys():
+        msg = f'Expected {file} to exist, but it was nowwhere to be found.'
+        raise AssertionError(msg)
diff --git a/nixos/tests/systemd-confinement/default.nix b/nixos/tests/systemd-confinement/default.nix
new file mode 100644
index 0000000000000..15d442d476b08
--- /dev/null
+++ b/nixos/tests/systemd-confinement/default.nix
@@ -0,0 +1,274 @@
+import ../make-test-python.nix {
+  name = "systemd-confinement";
+
+  nodes.machine = { pkgs, lib, ... }: let
+    testLib = pkgs.python3Packages.buildPythonPackage {
+      name = "confinement-testlib";
+      unpackPhase = ''
+        cat > setup.py <<EOF
+        from setuptools import setup
+        setup(name='confinement-testlib', py_modules=["checkperms"])
+        EOF
+        cp ${./checkperms.py} checkperms.py
+      '';
+    };
+
+    mkTest = name: testScript: pkgs.writers.writePython3 "${name}.py" {
+      libraries = [ pkgs.python3Packages.pytest testLib ];
+    } ''
+      # This runs our test script by using pytest's assertion rewriting, so
+      # that whenever we use "assert <something>", the actual values are
+      # printed rather than getting a generic AssertionError or the need to
+      # pass an explicit assertion error message.
+      import ast
+      from pathlib import Path
+      from _pytest.assertion.rewrite import rewrite_asserts
+
+      script = Path('${pkgs.writeText "${name}-main.py" ''
+        import errno, os, pytest, signal
+        from subprocess import run
+        from checkperms import Accessibility, assert_permissions
+
+        ${testScript}
+      ''}') # noqa
+      filename = str(script)
+      source = script.read_bytes()
+
+      tree = ast.parse(source, filename=filename)
+      rewrite_asserts(tree, source, filename)
+      exec(compile(tree, filename, 'exec', dont_inherit=True))
+    '';
+
+    mkTestStep = num: {
+      description,
+      testScript,
+      config ? {},
+      serviceName ? "test${toString num}",
+      rawUnit ? null,
+    }: {
+      systemd.packages = lib.optional (rawUnit != null) (pkgs.writeTextFile {
+        name = serviceName;
+        destination = "/etc/systemd/system/${serviceName}.service";
+        text = rawUnit;
+      });
+
+      systemd.services.${serviceName} = {
+        inherit description;
+        requiredBy = [ "multi-user.target" ];
+        confinement = (config.confinement or {}) // { enable = true; };
+        serviceConfig = (config.serviceConfig or {}) // {
+          ExecStart = mkTest serviceName testScript;
+          Type = "oneshot";
+        };
+      } // removeAttrs config [ "confinement" "serviceConfig" ];
+    };
+
+    parametrisedTests = lib.concatMap ({ user, privateTmp }: let
+      withTmp = if privateTmp then "with PrivateTmp" else "without PrivateTmp";
+
+      serviceConfig = if user == "static-user" then {
+        User = "chroot-testuser";
+        Group = "chroot-testgroup";
+      } else if user == "dynamic-user" then {
+        DynamicUser = true;
+      } else {};
+
+    in [
+      { description = "${user}, chroot-only confinement ${withTmp}";
+        config = {
+          confinement.mode = "chroot-only";
+          # Only set if privateTmp is true to ensure that the default is false.
+          serviceConfig = serviceConfig // lib.optionalAttrs privateTmp {
+            PrivateTmp = true;
+          };
+        };
+        testScript = if user == "root" then ''
+          assert os.getuid() == 0
+          assert os.getgid() == 0
+
+          assert_permissions({
+            'bin': Accessibility.READABLE,
+            'nix': Accessibility.READABLE,
+            'run': Accessibility.READABLE,
+            ${lib.optionalString privateTmp "'tmp': Accessibility.STICKY,"}
+            ${lib.optionalString privateTmp "'var': Accessibility.READABLE,"}
+            ${lib.optionalString privateTmp "'var/tmp': Accessibility.STICKY,"}
+          })
+        '' else ''
+          assert os.getuid() != 0
+          assert os.getgid() != 0
+
+          assert_permissions({
+            'bin': Accessibility.READABLE,
+            'nix': Accessibility.READABLE,
+            'run': Accessibility.READABLE,
+            ${lib.optionalString privateTmp "'tmp': Accessibility.STICKY,"}
+            ${lib.optionalString privateTmp "'var': Accessibility.READABLE,"}
+            ${lib.optionalString privateTmp "'var/tmp': Accessibility.STICKY,"}
+          })
+        '';
+      }
+      { description = "${user}, full APIVFS confinement ${withTmp}";
+        config = {
+          # Only set if privateTmp is false to ensure that the default is true.
+          serviceConfig = serviceConfig // lib.optionalAttrs (!privateTmp) {
+            PrivateTmp = false;
+          };
+        };
+        testScript = if user == "root" then ''
+          assert os.getuid() == 0
+          assert os.getgid() == 0
+
+          assert_permissions({
+            'bin': Accessibility.READABLE,
+            'nix': Accessibility.READABLE,
+            ${lib.optionalString privateTmp "'tmp': Accessibility.STICKY,"}
+            'run': Accessibility.WRITABLE,
+
+            'proc': Accessibility.SPECIAL,
+            'sys': Accessibility.SPECIAL,
+            'dev': Accessibility.WRITABLE,
+
+            ${lib.optionalString privateTmp "'var': Accessibility.READABLE,"}
+            ${lib.optionalString privateTmp "'var/tmp': Accessibility.STICKY,"}
+          })
+        '' else ''
+          assert os.getuid() != 0
+          assert os.getgid() != 0
+
+          assert_permissions({
+            'bin': Accessibility.READABLE,
+            'nix': Accessibility.READABLE,
+            ${lib.optionalString privateTmp "'tmp': Accessibility.STICKY,"}
+            'run': Accessibility.STICKY,
+
+            'proc': Accessibility.SPECIAL,
+            'sys': Accessibility.SPECIAL,
+            'dev': Accessibility.SPECIAL,
+            'dev/shm': Accessibility.STICKY,
+            'dev/mqueue': Accessibility.STICKY,
+
+            ${lib.optionalString privateTmp "'var': Accessibility.READABLE,"}
+            ${lib.optionalString privateTmp "'var/tmp': Accessibility.STICKY,"}
+          })
+        '';
+      }
+    ]) (lib.cartesianProductOfSets {
+      user = [ "root" "dynamic-user" "static-user" ];
+      privateTmp = [ true false ];
+    });
+
+  in {
+    imports = lib.imap1 mkTestStep (parametrisedTests ++ [
+      { description = "existence of bind-mounted /etc";
+        config.serviceConfig.BindReadOnlyPaths = [ "/etc" ];
+        testScript = ''
+          assert Path('/etc/passwd').read_text()
+        '';
+      }
+      (let
+        symlink = pkgs.runCommand "symlink" {
+          target = pkgs.writeText "symlink-target" "got me";
+        } "ln -s \"$target\" \"$out\"";
+      in {
+        description = "check if symlinks are properly bind-mounted";
+        config.confinement.packages = lib.singleton symlink;
+        testScript = ''
+          assert Path('${symlink}').read_text() == 'got me'
+        '';
+      })
+      { description = "check if StateDirectory works";
+        config.serviceConfig.User = "chroot-testuser";
+        config.serviceConfig.Group = "chroot-testgroup";
+        config.serviceConfig.StateDirectory = "testme";
+
+        # We restart on purpose here since we want to check whether the state
+        # directory actually persists.
+        config.serviceConfig.Restart = "on-failure";
+        config.serviceConfig.RestartMode = "direct";
+
+        testScript = ''
+          assert not Path('/tmp/canary').exists()
+          Path('/tmp/canary').touch()
+
+          if (foo := Path('/var/lib/testme/foo')).exists():
+            assert Path('/var/lib/testme/foo').read_text() == 'works'
+          else:
+            Path('/var/lib/testme/foo').write_text('works')
+            print('<4>Exiting with failure to check persistence on restart.')
+            raise SystemExit(1)
+        '';
+      }
+      { description = "check if /bin/sh works";
+        testScript = ''
+          assert Path('/bin/sh').exists()
+
+          result = run(
+            ['/bin/sh', '-c', 'echo -n bar'],
+            capture_output=True,
+            check=True,
+          )
+          assert result.stdout == b'bar'
+        '';
+      }
+      { description = "check if suppressing /bin/sh works";
+        config.confinement.binSh = null;
+        testScript = ''
+          assert not Path('/bin/sh').exists()
+          with pytest.raises(FileNotFoundError):
+            run(['/bin/sh', '-c', 'echo foo'])
+        '';
+      }
+      { description = "check if we can set /bin/sh to something different";
+        config.confinement.binSh = "${pkgs.hello}/bin/hello";
+        testScript = ''
+          assert Path('/bin/sh').exists()
+          result = run(
+            ['/bin/sh', '-g', 'foo'],
+            capture_output=True,
+            check=True,
+          )
+          assert result.stdout == b'foo\n'
+        '';
+      }
+      { description = "check if only Exec* dependencies are included";
+        config.environment.FOOBAR = pkgs.writeText "foobar" "eek";
+        testScript = ''
+          with pytest.raises(FileNotFoundError):
+            Path(os.environ['FOOBAR']).read_text()
+        '';
+      }
+      { description = "check if fullUnit includes all dependencies";
+        config.environment.FOOBAR = pkgs.writeText "foobar" "eek";
+        config.confinement.fullUnit = true;
+        testScript = ''
+          assert Path(os.environ['FOOBAR']).read_text() == 'eek'
+        '';
+      }
+      { description = "check if shipped unit file still works";
+        config.confinement.mode = "chroot-only";
+        rawUnit = ''
+          [Service]
+          SystemCallFilter=~kill
+          SystemCallErrorNumber=ELOOP
+        '';
+        testScript = ''
+          with pytest.raises(OSError) as excinfo:
+            os.kill(os.getpid(), signal.SIGKILL)
+          assert excinfo.value.errno == errno.ELOOP
+        '';
+      }
+    ]);
+
+    config.users.groups.chroot-testgroup = {};
+    config.users.users.chroot-testuser = {
+      isSystemUser = true;
+      description = "Chroot Test User";
+      group = "chroot-testgroup";
+    };
+  };
+
+  testScript = ''
+    machine.wait_for_unit("multi-user.target")
+  '';
+}