about summary refs log tree commit diff
path: root/pkgs/games
diff options
context:
space:
mode:
Diffstat (limited to 'pkgs/games')
-rw-r--r--pkgs/games/gog/default.nix1
-rw-r--r--pkgs/games/gog/factorio/default.nix44
-rw-r--r--pkgs/games/gog/factorio/patch.py440
-rw-r--r--pkgs/games/gog/factorio/test_patch.py210
4 files changed, 695 insertions, 0 deletions
diff --git a/pkgs/games/gog/default.nix b/pkgs/games/gog/default.nix
index b405016a..0061733e 100644
--- a/pkgs/games/gog/default.nix
+++ b/pkgs/games/gog/default.nix
@@ -19,6 +19,7 @@ let
     crosscode = callPackage ./crosscode.nix {};
     dungeons3 = callPackage ./dungeons3.nix {};
     epistory = callPackage ./epistory.nix { };
+    factorio = callPackage ./factorio {};
     freedom-planet = callPackage ./freedom-planet.nix {};
     gibbous = callPackage ./gibbous.nix {};
     graveyard-keeper = callPackage ./graveyard-keeper.nix {};
diff --git a/pkgs/games/gog/factorio/default.nix b/pkgs/games/gog/factorio/default.nix
new file mode 100644
index 00000000..109e03a2
--- /dev/null
+++ b/pkgs/games/gog/factorio/default.nix
@@ -0,0 +1,44 @@
+{ lib, buildGame, fetchGog, alsaLib, libGL, libpulseaudio, xorg
+, python3, python3Packages, runCommand
+}:
+
+buildGame rec {
+  name = "factorio-${version}";
+  version = "1.0.0";
+
+  src = fetchGog {
+    productId = 1238653230;
+    sha256 = "1n6a4qbp9i161jdn0f28ih7zc1blraqk0ypi3pizkc7ddc7bb7ja";
+  };
+
+  nativeBuildInputs = [ python3 python3Packages.r2pipe ];
+
+  buildInputs = [
+    alsaLib libGL libpulseaudio xorg.libICE xorg.libSM xorg.libX11
+    xorg.libXcursor xorg.libXext xorg.libXinerama xorg.libXrandr
+  ];
+
+  postPatch = "python3 ${lib.escapeShellArg (runCommand "patcher" {
+    src = lib.sourceFilesBySuffices ./. [ ".py" ];
+    nativeBuildInputs = [
+      python3Packages.mypy
+      python3Packages.flake8
+      python3Packages.pytest
+      python3Packages.hypothesis
+      python3Packages.r2pipe
+    ];
+  } ''
+    pytest "$src" -o "cache_dir=$PWD"
+    mypy "$src"
+    flake8 "$src"
+    install -vD -m 0644 "$src/patch.py" "$out"
+  '')} bin/x64/factorio";
+
+  installPhase = ''
+    install -vD bin/x64/factorio "$out/bin/factorio"
+    mkdir -p "$out/share"
+    cp -rT data "$out/share/factorio"
+  '';
+
+  sandbox.paths.required = [ "$XDG_DATA_HOME/factorio" ];
+}
diff --git a/pkgs/games/gog/factorio/patch.py b/pkgs/games/gog/factorio/patch.py
new file mode 100644
index 00000000..9ffd9bb6
--- /dev/null
+++ b/pkgs/games/gog/factorio/patch.py
@@ -0,0 +1,440 @@
+import os
+import sys
+import logging
+
+from contextlib import contextmanager
+from itertools import dropwhile
+from pathlib import Path
+from typing import Dict, List, Tuple, Any, Optional, ContextManager, \
+                   Generator, Callable
+from r2pipe import open as R2Pipe  # type: ignore
+
+
+class Compost:
+    """
+    Represents non-overlapping areas of the binary that can be re-used
+    to store additional data and code. In a high level way, this is a similar
+    interface to 'malloc' and 'free', where we use free() to dispose something
+    and alloc() to get something from the (compost) heap.
+    """
+    def __init__(self):
+        self.offsets: List[Tuple[int, int]] = []
+
+    @property
+    def available(self) -> int:
+        """
+        The total amount of data available for re-use, not necessarily
+        contiguous.
+        """
+        return sum(end + 1 - start for start, end in self.offsets)
+
+    @property
+    def largest_chunk(self) -> int:
+        """
+        The largest chunk of contiguous amount data that is avaliable for
+        re-use.
+        """
+        if self.offsets:
+            return self.offsets[-1][1] - self.offsets[-1][0] + 1
+        return 0
+
+    @property
+    def smallest_chunk(self) -> int:
+        """
+        The smallest chunk of contiguous amount data that is avaliable for
+        re-use.
+        """
+        if self.offsets:
+            return self.offsets[0][1] - self.offsets[0][0] + 1
+        return 0
+
+    def free(self, start: int, end: int) -> None:
+        """
+        Add the given byte range to the space that can be reused later.
+        """
+        if start < 0:
+            raise ValueError("Start offset needs to positive.")
+        if end < start:
+            msg = "End offset needs to be larger than start offset."
+            raise ValueError(msg)
+
+        to_delete = []
+        for n, existing in enumerate(self.offsets):
+            existing_start, existing_end = existing
+
+            if start <= existing_end and existing_start <= end:
+                wanted = start, end
+                msg = f"Range {wanted!r} is overlapping with {existing!r}."
+                raise ValueError(msg)
+
+            # If the offsets follow or precede existing ranges, merge them.
+            if existing_end + 1 == start:
+                start = existing_start
+                to_delete.append(n)
+            if end + 1 == existing_start:
+                end = existing_end
+                to_delete.append(n)
+        for n in to_delete:
+            del self.offsets[n]
+
+        self.offsets.append((start, end))
+        self.offsets.sort(key=lambda o: o[1] - o[0])
+
+    def alloc(self, size: int) -> int:
+        """
+        Get `size` bytes of data for re-use, mark them as used and return its
+        offset.
+        """
+        if size < 1:
+            raise ValueError("Allocation size has to be at least 1 byte.")
+        for n, (start, end) in enumerate(self.offsets):
+            if end - start + 1 >= size:
+                new_start = start + size
+                self.offsets[n] = (new_start, end)
+                if new_start == end + 1:
+                    del self.offsets[n]
+                return start
+        if size == 1:
+            msg = f"Unable to find space for one byte in {self.offsets!r}."
+        else:
+            msg = f"Unable to find space for {{}} bytes in {self.offsets!r}."
+        raise LookupError(msg.format(size))
+
+    @contextmanager
+    def maybe_alloc(
+        self, size: int
+    ) -> Generator[Tuple[int, Callable[[int], None]], None, None]:
+        """
+        Allocate at least `size` bytes and pass the offset and a callback to
+        the context. The offset is the start address of the allocated bytes and
+        the callback accepts a single argument denoting how many bytes were
+        actually used. After leaving the context, the excess bytes are freed
+        back to the compost.
+        """
+        offset = self.alloc(size)
+        used = 0
+
+        def _mark_used(used_size: int):
+            nonlocal used
+            if used_size < 0 or used_size > size:
+                raise ValueError(f"Tried to mark {used_size} bytes as used,"
+                                 f" but only {size} bytes are available.")
+            used = used_size
+
+        try:
+            yield offset, _mark_used
+        finally:
+            if used < size:
+                start = offset + used
+                self.free(start, offset + size - 1)
+
+
+class Patcher(ContextManager['Patcher']):
+    compost: Compost
+
+    def __init__(self, filename: str):
+        self.filename = filename
+        self.__r2: Optional[R2Pipe] = None
+        self.__addr: Optional[int] = None
+        self.compost = Compost()
+
+    def __enter__(self) -> 'Patcher':
+        self.__r2 = R2Pipe(self.filename, ['-w'])
+        logging.info(f'Analysing program {self.filename}.')
+        self.raw_command('aa')
+        return self
+
+    def __exit__(self, *exc_details) -> None:
+        assert self.__r2 is not None
+        self.__r2.quit()
+
+    def raw_command(self, cmd: str) -> str:
+        assert self.__r2 is not None
+        return self.__r2.cmd(cmd)
+
+    def raw_command_json(self, cmd: str) -> Any:
+        assert self.__r2 is not None
+        return self.__r2.cmdj(cmd)
+
+    @property
+    def address(self) -> int:
+        if self.__addr is None:
+            self.__addr = self.raw_command_json('?vi $$')
+        return self.__addr
+
+    @address.setter
+    def address(self, addr: int) -> None:
+        self.raw_command(f's {addr}')
+        self.__addr = addr
+
+    def __invalidate_address(self):
+        self.__addr = None
+
+    @contextmanager
+    def at_address(self, addr: int) -> Generator[None, None, None]:
+        """
+        Seek to given address and restore position after leaving context.
+        """
+        oldaddr = self.address
+        self.address = addr
+        try:
+            yield
+        finally:
+            self.address = oldaddr
+
+    def get_current_opcode_size(self, count: int = 1) -> int:
+        """
+        Return the size in bytes for the next `count` opcodes.
+        """
+        return self.raw_command_json(f'aos {count}')
+
+    def get_assembled_size(self, *opcodes: str) -> int:
+        """
+        Return the size in bytes of the given opcodes. Note that JMP locations
+        may be position-dependent, so you have to seek to the address first if
+        you want this to be reliable.
+        """
+        result = self.raw_command(f'"wa* {";".join(opcodes)}"')
+        assert result.startswith('wx ')
+        return len(result[3:].strip()) >> 1
+
+    def write_code(self, *opcodes: str) -> int:
+        """
+        Write the specified opcodes and return the written length in bytes.
+        """
+        oldaddr = self.address
+        logging.info(f'Writing assembly of {opcodes!r} to address'
+                     f' 0x{oldaddr:x}.')
+        self.raw_command(f'"wa {";".join(opcodes)}"')
+        self.raw_command(f'so {len(opcodes)}')
+        self.__invalidate_address()
+        return self.address - oldaddr
+
+    def replace_code(self, *opcodes: str, count: int = 1) -> int:
+        """
+        Write the specified opcodes over the next `count` opcodes at the
+        current position with padding and return the written length in bytes.
+        """
+        current_size = self.get_current_opcode_size(count)
+        expected_size = self.get_assembled_size(*opcodes)
+        assert expected_size <= current_size
+        padsize = current_size - expected_size
+        nops = ['nop'] * padsize
+        logging.info(f'Replacing {count} opcodes ({current_size} bytes)'
+                     f' at address 0x{self.address} with assembly'
+                     f' {opcodes!r} (padded with {padsize} nops).')
+        written = self.write_code(*opcodes, *nops)
+        assert written == current_size, \
+            f"Wanted to replace {count} opcodes consisting of {current_size}" \
+            f" bytes, however we've written {written} bytes instead."
+        return written
+
+    def write_code_to_compost(self, *opcodes: str) -> int:
+        """
+        Write the specified opcodes to the `Compost` area and return the
+        offset.
+        """
+        estimated = self.get_assembled_size(*opcodes)
+        with self.compost.maybe_alloc(estimated + 20) as (offset, mark_used):
+            with self.at_address(offset):
+                logging.info(f'Writing assembly {opcodes!r} to compost at'
+                             f' address 0x{offset:x}.')
+                size = self.get_assembled_size(*opcodes)
+                written = self.write_code(*opcodes)
+                mark_used(written)
+                assert written == size, \
+                    f"Assembled size for {opcodes!r} is {size} bytes," \
+                    f" but the actual code size is {written} bytes."
+            return offset
+
+    def write_cstring(self, value: str) -> int:
+        """
+        Write the specified C-String and return the length in bytes (which
+        includes the NUL byte).
+        """
+        logging.info(f'Writing C-String {value!r} to address'
+                     f' 0x{self.address:x}.')
+        self.raw_command(f'w {value}\\0')
+        size = len(value) + 1
+        self.raw_command(f's+ {size}')
+        self.address += size
+        return size
+
+    def write_cstring_to_compost(self, value: str) -> int:
+        """
+        Write the specified C-String to the `Compost` area and return its
+        offset.
+        """
+        size = len(value) + 1
+        offset = self.compost.alloc(size)
+        with self.at_address(offset):
+            logging.info(f'Writing C-String {value!r} to compost at'
+                         f' address 0x{offset:x}.')
+            self.raw_command(f'w {value}\\0')
+        return offset
+
+    def read_cstring(self, addr: int) -> Dict[Any, Any]:
+        """
+        Read the C-String at the specified `addr` and return its information.
+        """
+        return self.raw_command_json(f'pszj @{addr}')
+
+    def load_function(self, name: str) -> Dict[Any, Any]:
+        self.raw_command(f'sf {name}')
+        fun = self.raw_command_json('pdfj')
+        assert fun['name'] == name
+        self.__invalidate_address()
+        return fun
+
+    def skip_opcodes(self, amount: int = 1) -> int:
+        "Skip the specified amount of opcodes and return the total length."
+        oldaddr = self.address
+        self.raw_command(f'so {amount}')
+        self.__invalidate_address()
+        return self.address - oldaddr
+
+
+def patch_get_executable_name(patcher: Patcher, out: Path) -> None:
+    # We'll use the get_executable_name() function to not only hardcode the
+    # executable path but also use the unused part for static data referenced
+    # by other functions.
+    fun = patcher.load_function('sym.get_executable_name')
+    offset = fun["ops"][0]["offset"]
+
+    # Let's just assume 32 bytes, which is enough to hopefully fit in the code.
+    executable_offset = offset + 32
+
+    # This makes sure that get_executable_name always returns the binary at
+    # $out/bin/factorio, since we don't need to do all kinds of shenanigans at
+    # runtime when using Nix.
+    size = patcher.write_code(
+        f'mov edi, {executable_offset}',
+        'call sym.al_create_path',
+        'ret',
+    )
+    assert size < 32, "Replacement code for get_executable_name " \
+                      f"would overflow into compost area ({size} >= 32)."
+
+    executable = str(out / 'bin' / 'factorio')
+    patcher.address = executable_offset
+    size = patcher.write_cstring(executable)
+
+    # TODO: End should be the *real* end, not the offset of the last opcode!
+    patcher.compost.free(executable_offset + size, fun["ops"][-1]["offset"])
+
+
+def patch_read_data_path(patcher: Patcher, out: Path) -> None:
+    data_path = str(out / 'share' / 'factorio')
+    data_path_offset = patcher.write_cstring_to_compost(data_path)
+
+    matches = patcher.raw_command_json('/vj str.usr_share_factorio')
+    assert len(matches) > 0, \
+        f'no matches found for /usr/share/factorio: {matches}'
+
+    for match in matches:
+        assert match['type'] == 'hexpair'
+        assert len(match['data']) == 8
+        patcher.raw_command(f'wv4 {data_path_offset} @{match["offset"]}')
+
+
+def patch_write_data_path(patcher: Patcher) -> None:
+    # This is the function which returns fs::path("$HOME/.factorio"), but since
+    # we want to conform to XDG, we need to rewrite that function.
+    fun = patcher.load_function('method.Paths.getSystemWriteData')
+
+    # The function in question uses getpwuid(getuid())->pw_dir, so let's find
+    # out the register that references the value of the home directory.
+    pwuid_call = 'call sym.imp.getpwuid'
+    rest = dropwhile(lambda i: i['disasm'] != pwuid_call, fun['ops'])
+    rest = dropwhile(lambda i: i['esil'].split(',')[1:2] != ['rax'], rest)
+    home_register = next(rest)['esil'].split(',')[4]
+
+    # Just to make sure we inspect this on the next upstream update if the
+    # register has changed.
+    assert home_register == 'r13'
+
+    # Get all the data references of the current function so that we can not
+    # only re-use them but also have a hint on what we might want to rewrite.
+    refs = {}
+    for n, op in enumerate(fun["ops"]):
+        for ref in op.get('refs', []):
+            if ref['type'] != 'DATA':
+                continue
+            refs[n] = patcher.read_cstring(ref['addr'])
+
+    # The disassembly of the operation we want to rewrite.
+    rewrite_pos_info = next(
+        fun['ops'][n] for n, i in refs.items() if i['string'] == '/.factorio'
+    )
+
+    # We want to make sure that RSI contains "/.factorio", since that's the
+    # register we want to rewrite.
+    assert rewrite_pos_info['esil'] == f'{rewrite_pos_info["ptr"]},rsi,='
+
+    # Seek to the rewrite position and memoize the address that comes directly
+    # afterwards, so that we can JMP back accordingly.
+    patcher.address = rewrite_pos_info["offset"]
+    patcher.skip_opcodes()
+    continuation_addr = patcher.address
+
+    # After we're done, we need to restore these registers to the state before
+    # we JMPed into our new code. We *only* restore registers for argument 1,
+    # 4, 5, 6, 7, 8 and the integer return register because the second argument
+    # is one that we actually *want* to override.
+    restore_registers = ['rdi', 'rcx', 'r8', 'r9', 'r10', 'r11', 'rax']
+
+    # Just to make sure we don't accidentally revert a register we write into.
+    assert home_register not in restore_registers
+    init_code = [f'push {reg}' for reg in restore_registers]
+    cleanup_code = [f'pop {reg}' for reg in reversed(restore_registers)]
+    cleanup_code.append(f'jmp {continuation_addr}')
+
+    xdg_fallback = '/.local/share/factorio'
+    xdg_fallback_offset = patcher.write_cstring_to_compost(xdg_fallback)
+
+    # This code is the handler for whenever no XDG_DATA_HOME is defined.
+    noxdg_code_offset = patcher.write_code_to_compost(
+        f'mov rsi, {xdg_fallback_offset}',
+        f'mov rdx, {xdg_fallback_offset + len(xdg_fallback)}',
+        *cleanup_code,
+    )
+
+    xdg_data_home_offset = patcher.write_cstring_to_compost('XDG_DATA_HOME')
+
+    factorio_subpath = '/factorio'
+    factorio_subpath_offset = \
+        patcher.write_cstring_to_compost(factorio_subpath)
+
+    xdg_code_offset = patcher.write_code_to_compost(
+        *init_code,
+
+        f'mov rdi, {xdg_data_home_offset}',
+        'call sym.imp.getenv',
+
+        # If getenv returns NULL, let's jump to the fallback code (using
+        # ~/.local/share) we've written previously.
+        'test rax, rax',
+        f'je {noxdg_code_offset}',
+
+        # Essentially override the home directory with XDG_DATA_HOME and
+        # replace "/.factorio" with "/factorio".
+        f'mov {home_register}, rax',
+        f'mov rsi, {factorio_subpath_offset}',
+        f'mov rdx, {factorio_subpath_offset + len(factorio_subpath)}',
+
+        *cleanup_code,
+    )
+
+    # Finally, let's write the initial JMP back to our handler code.
+    patcher.address = rewrite_pos_info["offset"]
+    patcher.write_code(f'jmp {xdg_code_offset}')
+
+
+if __name__ == '__main__':
+    logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
+
+    with Patcher(sys.argv[1]) as patcher:
+        out = Path(os.environ['out'])
+        patch_get_executable_name(patcher, out)
+        patch_read_data_path(patcher, out)
+        patch_write_data_path(patcher)
diff --git a/pkgs/games/gog/factorio/test_patch.py b/pkgs/games/gog/factorio/test_patch.py
new file mode 100644
index 00000000..7d5fad82
--- /dev/null
+++ b/pkgs/games/gog/factorio/test_patch.py
@@ -0,0 +1,210 @@
+import pytest  # type: ignore
+
+from io import StringIO
+from itertools import groupby
+from typing import Set, Tuple, Iterable, Callable, Any
+
+from hypothesis import assume, given  # type: ignore
+import hypothesis.strategies as st  # type: ignore
+
+from patch import Compost
+
+MAX_OFFSET = 1000
+ints = st.integers(min_value=0, max_value=MAX_OFFSET)
+
+
+@given(ranges=st.iterables(st.tuples(ints, ints)))
+def test_compost_free(ranges: Iterable[Tuple[int, int]]) -> None:
+    compost = Compost()
+
+    # Use a completely different (however inefficient) implementation here to
+    # check for overlaps so that we can make sure the actual implementation
+    # isn't "accidentally correct".
+    bitset: Set[int] = set()
+    for start, end in ranges:
+        if end < start:
+            with pytest.raises(ValueError):
+                compost.free(start, end)
+        else:
+            must_fail = False
+            new_bitset = set()
+            for bit in range(start, end + 1):
+                if bit in bitset:
+                    must_fail = True
+                new_bitset.add(bit)
+            if must_fail:
+                with pytest.raises(ValueError):
+                    compost.free(start, end)
+            else:
+                bitset |= new_bitset
+                compost.free(start, end)
+
+    # Allocating from the biggest chunks to the smallest ones needs to always
+    # work and will also make sure that contiguous areas are properly merged.
+    grouped = groupby(range(MAX_OFFSET + 1), bitset.__contains__)
+    chunks = [len(list(chunk)) for valid, chunk in grouped if valid]
+    for size in sorted(chunks, reverse=True):
+        compost.alloc(size)
+
+    # The last allocation is just one byte and it must fail, since we exhausted
+    # all the space we have on the compost heap.
+    with pytest.raises(LookupError):
+        compost.alloc(1)
+
+
+@st.composite
+def valid_ranges(
+    draw: Callable[..., Any],
+    min_chunksize: int = 1,
+    max_chunksize: int = 10000,
+    min_ranges: int = 0,
+    max_ranges: int = 100,
+    allow_gaps: bool = True,
+):
+    gap = st.tuples(st.just(False), st.integers(min_value=0))
+    occupied = st.tuples(st.just(True),
+                         st.integers(min_value=min_chunksize,
+                                     max_value=max_chunksize))
+    elems = st.one_of(gap, occupied) if allow_gaps else occupied
+    sizes = draw(st.iterables(elems, min_size=min_ranges, max_size=max_ranges))
+    ranges = []
+    offset = 0
+    for is_occupied, size in sizes:
+        if is_occupied:
+            ranges.append((offset, offset + size))
+            offset += 1
+        offset += size
+    return ranges
+
+
+@given(ranges=valid_ranges())
+def test_compost_sizes(ranges: Iterable[Tuple[int, int]]) -> None:
+    compost = Compost()
+    total = 0
+    smallest = 0
+    largest = 0
+    for start, end in ranges:
+        size = end - start + 1
+        if smallest == 0 or size < smallest:
+            smallest = size
+        if size > largest:
+            largest = size
+        total += size
+        compost.free(start, end)
+
+    assert compost.available == total
+
+    # Values could have been merged, so the sizes will be at *least* the value
+    # we calculated here in the naive way.
+    assert compost.smallest_chunk >= smallest
+    assert compost.largest_chunk >= largest
+
+
+@given(ranges=valid_ranges(allow_gaps=False, min_ranges=1))
+def test_compost_gapless_alloc(ranges: Iterable[Tuple[int, int]]) -> None:
+    compost = Compost()
+    for start, end in ranges:
+        compost.free(start, end)
+
+    assert compost.available > 0
+    compost.alloc(compost.available)
+    assert compost.available == 0
+
+
+@given(ranges=valid_ranges(max_chunksize=100, min_ranges=1, max_ranges=10))
+def test_compost_alloc_nofrag(ranges: Iterable[Tuple[int, int]]) -> None:
+    compost = Compost()
+    for start, end in ranges:
+        compost.free(start, end)
+
+    total = compost.available
+    for n in range(compost.available):
+        compost.alloc(1)
+        total -= 1
+        assert compost.available == total
+
+    with pytest.raises(LookupError):
+        compost.alloc(1)
+
+
+@given(ranges=valid_ranges(min_chunksize=4, max_chunksize=20, min_ranges=1),
+       allocs=st.iterables(st.integers(min_value=1, max_value=21), min_size=1))
+def test_compost_alloc(ranges: Iterable[Tuple[int, int]],
+                       allocs: Iterable[int]) -> None:
+    compost = Compost()
+    for start, end in ranges:
+        compost.free(start, end)
+
+    for alloc_size in allocs:
+        assume(compost.largest_chunk >= alloc_size)
+        size_before = compost.available
+        compost.alloc(alloc_size)
+        assert size_before - alloc_size == compost.available
+
+
+@given(ranges=valid_ranges(min_chunksize=2, max_chunksize=5,
+                           min_ranges=10, max_ranges=15,
+                           allow_gaps=False),
+       use=st.integers(min_value=-1, max_value=21))
+def test_compost_maybe_alloc(ranges: Iterable[Tuple[int, int]],
+                             use: int) -> None:
+    compost = Compost()
+    for start, end in ranges:
+        compost.free(start, end)
+
+    # This is similar to the one in test_compost_free and it serves as a canary
+    # to make sure we don't accidentally overwrite something we don't want.
+    bitset_before = {i for start, end in compost.offsets
+                     for i in range(1000) if start <= i <= end}
+
+    initial_available = compost.available
+    with compost.maybe_alloc(20) as (_, mark_used):
+        assert initial_available - compost.available == 20
+        assume(0 <= use <= 20)
+        mark_used(use)
+
+    # If the bitset before the allocation is a superset, something is clearly
+    # wrong and we're now in the middle of invalid memory.
+    bitset_afterwards = {i for start, end in compost.offsets
+                         for i in range(1000) if start <= i <= end}
+    assert bitset_before.issuperset(bitset_afterwards)
+
+    assert initial_available - use == compost.available
+
+
+@given(start=st.integers(min_value=0, max_value=100),
+       alloc_sizes=st.iterables(st.integers(min_value=1, max_value=20),
+                                max_size=5))
+def test_compost_writes(start: int, alloc_sizes: Iterable[int]):
+    compost = Compost()
+    compost.free(start, start + 100)
+
+    io = StringIO('.' * 200)
+    alloc_chars = 'abcdefghijkl'
+
+    expected_written = ''
+    for n, asize in enumerate(alloc_sizes):
+        char = alloc_chars[n]
+        offset = compost.alloc(asize)
+        io.seek(offset)
+        expected_written += char * asize
+        io.write(char * asize)
+
+    io.seek(0)
+    assert io.read().startswith('.' * start + expected_written)
+
+
+def test_invalid_compost_calls():
+    compost = Compost()
+
+    with pytest.raises(ValueError):
+        compost.free(10, 9)
+
+    with pytest.raises(ValueError):
+        compost.free(-10, 0)
+
+    with pytest.raises(ValueError):
+        compost.alloc(0)
+
+    with pytest.raises(ValueError):
+        compost.alloc(-10)