diff options
Diffstat (limited to 'nixos/tests/systemd-confinement')
-rw-r--r-- | nixos/tests/systemd-confinement/checkperms.py | 187 | ||||
-rw-r--r-- | nixos/tests/systemd-confinement/default.nix | 274 |
2 files changed, 461 insertions, 0 deletions
diff --git a/nixos/tests/systemd-confinement/checkperms.py b/nixos/tests/systemd-confinement/checkperms.py new file mode 100644 index 0000000000000..3c7ba279a3d20 --- /dev/null +++ b/nixos/tests/systemd-confinement/checkperms.py @@ -0,0 +1,187 @@ +import errno +import os + +from enum import IntEnum +from pathlib import Path + + +class Accessibility(IntEnum): + """ + The level of accessibility we have on a file or directory. + + This is needed to assess the attack surface on the file system namespace we + have within a confined service. Higher levels mean more permissions for the + user and thus a bigger attack surface. + """ + NONE = 0 + + # Directories can be listed or files can be read. + READABLE = 1 + + # This is for special file systems such as procfs and for stuff such as + # FIFOs or character special files. The reason why this has a lower value + # than WRITABLE is because those files are more restricted on what and how + # they can be written to. + SPECIAL = 2 + + # Another special case are sticky directories, which do allow write access + # but restrict deletion. This does *not* apply to sticky directories that + # are read-only. + STICKY = 3 + + # Essentially full permissions, the kind of accessibility we want to avoid + # in most cases. + WRITABLE = 4 + + def assert_on(self, path: Path) -> None: + """ + Raise an AssertionError if the given 'path' allows for more + accessibility than 'self'. + """ + actual = self.NONE + + if path.is_symlink(): + actual = self.READABLE + elif path.is_dir(): + writable = True + + dummy_file = path / 'can_i_write' + try: + dummy_file.touch() + except OSError as e: + if e.errno in [errno.EROFS, errno.EACCES]: + writable = False + else: + raise + else: + dummy_file.unlink() + + if writable: + # The reason why we test this *after* we made sure it's + # writable is because we could have a sticky directory where + # the current user doesn't have write access. + if path.stat().st_mode & 0o1000 == 0o1000: + actual = self.STICKY + else: + actual = self.WRITABLE + else: + actual = self.READABLE + elif path.is_file(): + try: + with path.open('rb') as fp: + fp.read(1) + actual = self.READABLE + except PermissionError: + pass + + writable = True + try: + with path.open('ab') as fp: + fp.write('x') + size = fp.tell() + fp.truncate(size) + except PermissionError: + writable = False + except OSError as e: + if e.errno == errno.ETXTBSY: + writable = os.access(path, os.W_OK) + elif e.errno == errno.EROFS: + writable = False + else: + raise + + # Let's always try to fail towards being writable, so if *either* + # access(2) or a real write is successful it's writable. This is to + # make sure we don't accidentally introduce no-ops if we have bugs + # in the more complicated real write code above. + if writable or os.access(path, os.W_OK): + actual = self.WRITABLE + else: + # We need to be very careful when writing to or reading from + # special files (eg. FIFOs), since they can possibly block. So if + # it's not a file, just trust that access(2) won't lie. + if os.access(path, os.R_OK): + actual = self.READABLE + + if os.access(path, os.W_OK): + actual = self.SPECIAL + + if actual > self: + stat = path.stat() + details = ', '.join([ + f'permissions: {stat.st_mode & 0o7777:o}', + f'uid: {stat.st_uid}', + f'group: {stat.st_gid}', + ]) + + raise AssertionError( + f'Expected at most {self!r} but got {actual!r} for path' + f' {path} ({details}).' + ) + + +def is_special_fs(path: Path) -> bool: + """ + Check whether the given path truly is a special file system such as procfs + or sysfs. + """ + try: + if path == Path('/proc'): + return (path / 'version').read_text().startswith('Linux') + elif path == Path('/sys'): + return b'Linux' in (path / 'kernel' / 'notes').read_bytes() + except FileNotFoundError: + pass + return False + + +def is_empty_dir(path: Path) -> bool: + try: + next(path.iterdir()) + return False + except (StopIteration, PermissionError): + return True + + +def _assert_permissions_in_directory( + directory: Path, + accessibility: Accessibility, + subdirs: dict[Path, Accessibility], +) -> None: + accessibility.assert_on(directory) + + for file in directory.iterdir(): + if is_special_fs(file): + msg = f'Got unexpected special filesystem at {file}.' + assert subdirs.pop(file) == Accessibility.SPECIAL, msg + elif not file.is_symlink() and file.is_dir(): + subdir_access = subdirs.pop(file, accessibility) + if is_empty_dir(file): + # Whenever we got an empty directory, we check the permission + # constraints on the current directory (except if specified + # explicitly in subdirs) because for example if we're non-root + # (the constraints of the current directory are thus + # Accessibility.READABLE), we really have to make sure that + # empty directories are *never* writable. + subdir_access.assert_on(file) + else: + _assert_permissions_in_directory(file, subdir_access, subdirs) + else: + subdirs.pop(file, accessibility).assert_on(file) + + +def assert_permissions(subdirs: dict[str, Accessibility]) -> None: + """ + Recursively check whether the file system conforms to the accessibility + specification we specified via 'subdirs'. + """ + root = Path('/') + absolute_subdirs = {root / p: a for p, a in subdirs.items()} + _assert_permissions_in_directory( + root, + Accessibility.WRITABLE if os.getuid() == 0 else Accessibility.READABLE, + absolute_subdirs, + ) + for file in absolute_subdirs.keys(): + msg = f'Expected {file} to exist, but it was nowwhere to be found.' + raise AssertionError(msg) diff --git a/nixos/tests/systemd-confinement/default.nix b/nixos/tests/systemd-confinement/default.nix new file mode 100644 index 0000000000000..15d442d476b08 --- /dev/null +++ b/nixos/tests/systemd-confinement/default.nix @@ -0,0 +1,274 @@ +import ../make-test-python.nix { + name = "systemd-confinement"; + + nodes.machine = { pkgs, lib, ... }: let + testLib = pkgs.python3Packages.buildPythonPackage { + name = "confinement-testlib"; + unpackPhase = '' + cat > setup.py <<EOF + from setuptools import setup + setup(name='confinement-testlib', py_modules=["checkperms"]) + EOF + cp ${./checkperms.py} checkperms.py + ''; + }; + + mkTest = name: testScript: pkgs.writers.writePython3 "${name}.py" { + libraries = [ pkgs.python3Packages.pytest testLib ]; + } '' + # This runs our test script by using pytest's assertion rewriting, so + # that whenever we use "assert <something>", the actual values are + # printed rather than getting a generic AssertionError or the need to + # pass an explicit assertion error message. + import ast + from pathlib import Path + from _pytest.assertion.rewrite import rewrite_asserts + + script = Path('${pkgs.writeText "${name}-main.py" '' + import errno, os, pytest, signal + from subprocess import run + from checkperms import Accessibility, assert_permissions + + ${testScript} + ''}') # noqa + filename = str(script) + source = script.read_bytes() + + tree = ast.parse(source, filename=filename) + rewrite_asserts(tree, source, filename) + exec(compile(tree, filename, 'exec', dont_inherit=True)) + ''; + + mkTestStep = num: { + description, + testScript, + config ? {}, + serviceName ? "test${toString num}", + rawUnit ? null, + }: { + systemd.packages = lib.optional (rawUnit != null) (pkgs.writeTextFile { + name = serviceName; + destination = "/etc/systemd/system/${serviceName}.service"; + text = rawUnit; + }); + + systemd.services.${serviceName} = { + inherit description; + requiredBy = [ "multi-user.target" ]; + confinement = (config.confinement or {}) // { enable = true; }; + serviceConfig = (config.serviceConfig or {}) // { + ExecStart = mkTest serviceName testScript; + Type = "oneshot"; + }; + } // removeAttrs config [ "confinement" "serviceConfig" ]; + }; + + parametrisedTests = lib.concatMap ({ user, privateTmp }: let + withTmp = if privateTmp then "with PrivateTmp" else "without PrivateTmp"; + + serviceConfig = if user == "static-user" then { + User = "chroot-testuser"; + Group = "chroot-testgroup"; + } else if user == "dynamic-user" then { + DynamicUser = true; + } else {}; + + in [ + { description = "${user}, chroot-only confinement ${withTmp}"; + config = { + confinement.mode = "chroot-only"; + # Only set if privateTmp is true to ensure that the default is false. + serviceConfig = serviceConfig // lib.optionalAttrs privateTmp { + PrivateTmp = true; + }; + }; + testScript = if user == "root" then '' + assert os.getuid() == 0 + assert os.getgid() == 0 + + assert_permissions({ + 'bin': Accessibility.READABLE, + 'nix': Accessibility.READABLE, + 'run': Accessibility.READABLE, + ${lib.optionalString privateTmp "'tmp': Accessibility.STICKY,"} + ${lib.optionalString privateTmp "'var': Accessibility.READABLE,"} + ${lib.optionalString privateTmp "'var/tmp': Accessibility.STICKY,"} + }) + '' else '' + assert os.getuid() != 0 + assert os.getgid() != 0 + + assert_permissions({ + 'bin': Accessibility.READABLE, + 'nix': Accessibility.READABLE, + 'run': Accessibility.READABLE, + ${lib.optionalString privateTmp "'tmp': Accessibility.STICKY,"} + ${lib.optionalString privateTmp "'var': Accessibility.READABLE,"} + ${lib.optionalString privateTmp "'var/tmp': Accessibility.STICKY,"} + }) + ''; + } + { description = "${user}, full APIVFS confinement ${withTmp}"; + config = { + # Only set if privateTmp is false to ensure that the default is true. + serviceConfig = serviceConfig // lib.optionalAttrs (!privateTmp) { + PrivateTmp = false; + }; + }; + testScript = if user == "root" then '' + assert os.getuid() == 0 + assert os.getgid() == 0 + + assert_permissions({ + 'bin': Accessibility.READABLE, + 'nix': Accessibility.READABLE, + ${lib.optionalString privateTmp "'tmp': Accessibility.STICKY,"} + 'run': Accessibility.WRITABLE, + + 'proc': Accessibility.SPECIAL, + 'sys': Accessibility.SPECIAL, + 'dev': Accessibility.WRITABLE, + + ${lib.optionalString privateTmp "'var': Accessibility.READABLE,"} + ${lib.optionalString privateTmp "'var/tmp': Accessibility.STICKY,"} + }) + '' else '' + assert os.getuid() != 0 + assert os.getgid() != 0 + + assert_permissions({ + 'bin': Accessibility.READABLE, + 'nix': Accessibility.READABLE, + ${lib.optionalString privateTmp "'tmp': Accessibility.STICKY,"} + 'run': Accessibility.STICKY, + + 'proc': Accessibility.SPECIAL, + 'sys': Accessibility.SPECIAL, + 'dev': Accessibility.SPECIAL, + 'dev/shm': Accessibility.STICKY, + 'dev/mqueue': Accessibility.STICKY, + + ${lib.optionalString privateTmp "'var': Accessibility.READABLE,"} + ${lib.optionalString privateTmp "'var/tmp': Accessibility.STICKY,"} + }) + ''; + } + ]) (lib.cartesianProductOfSets { + user = [ "root" "dynamic-user" "static-user" ]; + privateTmp = [ true false ]; + }); + + in { + imports = lib.imap1 mkTestStep (parametrisedTests ++ [ + { description = "existence of bind-mounted /etc"; + config.serviceConfig.BindReadOnlyPaths = [ "/etc" ]; + testScript = '' + assert Path('/etc/passwd').read_text() + ''; + } + (let + symlink = pkgs.runCommand "symlink" { + target = pkgs.writeText "symlink-target" "got me"; + } "ln -s \"$target\" \"$out\""; + in { + description = "check if symlinks are properly bind-mounted"; + config.confinement.packages = lib.singleton symlink; + testScript = '' + assert Path('${symlink}').read_text() == 'got me' + ''; + }) + { description = "check if StateDirectory works"; + config.serviceConfig.User = "chroot-testuser"; + config.serviceConfig.Group = "chroot-testgroup"; + config.serviceConfig.StateDirectory = "testme"; + + # We restart on purpose here since we want to check whether the state + # directory actually persists. + config.serviceConfig.Restart = "on-failure"; + config.serviceConfig.RestartMode = "direct"; + + testScript = '' + assert not Path('/tmp/canary').exists() + Path('/tmp/canary').touch() + + if (foo := Path('/var/lib/testme/foo')).exists(): + assert Path('/var/lib/testme/foo').read_text() == 'works' + else: + Path('/var/lib/testme/foo').write_text('works') + print('<4>Exiting with failure to check persistence on restart.') + raise SystemExit(1) + ''; + } + { description = "check if /bin/sh works"; + testScript = '' + assert Path('/bin/sh').exists() + + result = run( + ['/bin/sh', '-c', 'echo -n bar'], + capture_output=True, + check=True, + ) + assert result.stdout == b'bar' + ''; + } + { description = "check if suppressing /bin/sh works"; + config.confinement.binSh = null; + testScript = '' + assert not Path('/bin/sh').exists() + with pytest.raises(FileNotFoundError): + run(['/bin/sh', '-c', 'echo foo']) + ''; + } + { description = "check if we can set /bin/sh to something different"; + config.confinement.binSh = "${pkgs.hello}/bin/hello"; + testScript = '' + assert Path('/bin/sh').exists() + result = run( + ['/bin/sh', '-g', 'foo'], + capture_output=True, + check=True, + ) + assert result.stdout == b'foo\n' + ''; + } + { description = "check if only Exec* dependencies are included"; + config.environment.FOOBAR = pkgs.writeText "foobar" "eek"; + testScript = '' + with pytest.raises(FileNotFoundError): + Path(os.environ['FOOBAR']).read_text() + ''; + } + { description = "check if fullUnit includes all dependencies"; + config.environment.FOOBAR = pkgs.writeText "foobar" "eek"; + config.confinement.fullUnit = true; + testScript = '' + assert Path(os.environ['FOOBAR']).read_text() == 'eek' + ''; + } + { description = "check if shipped unit file still works"; + config.confinement.mode = "chroot-only"; + rawUnit = '' + [Service] + SystemCallFilter=~kill + SystemCallErrorNumber=ELOOP + ''; + testScript = '' + with pytest.raises(OSError) as excinfo: + os.kill(os.getpid(), signal.SIGKILL) + assert excinfo.value.errno == errno.ELOOP + ''; + } + ]); + + config.users.groups.chroot-testgroup = {}; + config.users.users.chroot-testuser = { + isSystemUser = true; + description = "Chroot Test User"; + group = "chroot-testgroup"; + }; + }; + + testScript = '' + machine.wait_for_unit("multi-user.target") + ''; +} |