about summary refs log tree commit diff
path: root/pkgs/games
diff options
context:
space:
mode:
authoraszlig <aszlig@nix.build>2020-12-28 09:30:45 +0100
committeraszlig <aszlig@nix.build>2020-12-30 16:37:54 +0100
commit2c24ba1d864d98d4c2c3a50501a41b61b43f854a (patch)
tree3b7346ea52c84a34cc66fb94ed97c25c6a0e4ba5 /pkgs/games
parent4f91df5db28a67fda6eca81ae9be7e580e637f98 (diff)
games: Add Factorio
This has been laying around since a few weeks but instead of just using
a simple shell script wrapper (which I had in the first place), I
decided to directly patch the binary during rC3.

The main reason why I went this route was because in the long run we
want to have a generic implementation we can use to patch all sorts of
games, similar to what we have with monogame-patcher.

So the way the *current* patcher roughly works is by allocating an area
called "compost", which is then used as some kind of abstraction for
allocating code and data to be used for the references/logic that we
need to patch.

The latter involves patching XDG_DATA_HOME into the game and changing
the "/usr/share/factorio" path to use the ones within the store path
($out/share/factorio). Fortunately, Factorio already assumes that
everything within /usr/share/factorio (or our path for that matter) is
read-only, so we don't need to add extra code/conditions specifically to
handle that.

Patching both cases is made possible by patching a third location
(get_executable_name), which tries to get the current executable path
via several methods (using /proc/self/exe, running "ps", ...) at
runtime, which in our case is really unnecessary and a perfect
opportunity to replace the function logic by the hardcoded path and
using the rest of the function for our compost area.

While patching might sound straigtforward, I actually introduced two
little (but hard to debug) bugs, where I'm very grateful to all those
folks (you know who you are) at rC3 who were actually following along
and provided helpful input.

In the long term, the goal is to rewrite the patcher with more elaborate
type information (eg. right now function/opcode information is raw JSON)
and generalise it enough that we can use it to get rid of a few other
wrappers.

Signed-off-by: aszlig <aszlig@nix.build>
Diffstat (limited to 'pkgs/games')
-rw-r--r--pkgs/games/gog/default.nix1
-rw-r--r--pkgs/games/gog/factorio/default.nix44
-rw-r--r--pkgs/games/gog/factorio/patch.py440
-rw-r--r--pkgs/games/gog/factorio/test_patch.py210
4 files changed, 695 insertions, 0 deletions
diff --git a/pkgs/games/gog/default.nix b/pkgs/games/gog/default.nix
index b405016a..0061733e 100644
--- a/pkgs/games/gog/default.nix
+++ b/pkgs/games/gog/default.nix
@@ -19,6 +19,7 @@ let
     crosscode = callPackage ./crosscode.nix {};
     dungeons3 = callPackage ./dungeons3.nix {};
     epistory = callPackage ./epistory.nix { };
+    factorio = callPackage ./factorio {};
     freedom-planet = callPackage ./freedom-planet.nix {};
     gibbous = callPackage ./gibbous.nix {};
     graveyard-keeper = callPackage ./graveyard-keeper.nix {};
diff --git a/pkgs/games/gog/factorio/default.nix b/pkgs/games/gog/factorio/default.nix
new file mode 100644
index 00000000..109e03a2
--- /dev/null
+++ b/pkgs/games/gog/factorio/default.nix
@@ -0,0 +1,44 @@
+{ lib, buildGame, fetchGog, alsaLib, libGL, libpulseaudio, xorg
+, python3, python3Packages, runCommand
+}:
+
+buildGame rec {
+  name = "factorio-${version}";
+  version = "1.0.0";
+
+  src = fetchGog {
+    productId = 1238653230;
+    sha256 = "1n6a4qbp9i161jdn0f28ih7zc1blraqk0ypi3pizkc7ddc7bb7ja";
+  };
+
+  nativeBuildInputs = [ python3 python3Packages.r2pipe ];
+
+  buildInputs = [
+    alsaLib libGL libpulseaudio xorg.libICE xorg.libSM xorg.libX11
+    xorg.libXcursor xorg.libXext xorg.libXinerama xorg.libXrandr
+  ];
+
+  postPatch = "python3 ${lib.escapeShellArg (runCommand "patcher" {
+    src = lib.sourceFilesBySuffices ./. [ ".py" ];
+    nativeBuildInputs = [
+      python3Packages.mypy
+      python3Packages.flake8
+      python3Packages.pytest
+      python3Packages.hypothesis
+      python3Packages.r2pipe
+    ];
+  } ''
+    pytest "$src" -o "cache_dir=$PWD"
+    mypy "$src"
+    flake8 "$src"
+    install -vD -m 0644 "$src/patch.py" "$out"
+  '')} bin/x64/factorio";
+
+  installPhase = ''
+    install -vD bin/x64/factorio "$out/bin/factorio"
+    mkdir -p "$out/share"
+    cp -rT data "$out/share/factorio"
+  '';
+
+  sandbox.paths.required = [ "$XDG_DATA_HOME/factorio" ];
+}
diff --git a/pkgs/games/gog/factorio/patch.py b/pkgs/games/gog/factorio/patch.py
new file mode 100644
index 00000000..9ffd9bb6
--- /dev/null
+++ b/pkgs/games/gog/factorio/patch.py
@@ -0,0 +1,440 @@
+import os
+import sys
+import logging
+
+from contextlib import contextmanager
+from itertools import dropwhile
+from pathlib import Path
+from typing import Dict, List, Tuple, Any, Optional, ContextManager, \
+                   Generator, Callable
+from r2pipe import open as R2Pipe  # type: ignore
+
+
+class Compost:
+    """
+    Represents non-overlapping areas of the binary that can be re-used
+    to store additional data and code. In a high level way, this is a similar
+    interface to 'malloc' and 'free', where we use free() to dispose something
+    and alloc() to get something from the (compost) heap.
+    """
+    def __init__(self):
+        self.offsets: List[Tuple[int, int]] = []
+
+    @property
+    def available(self) -> int:
+        """
+        The total amount of data available for re-use, not necessarily
+        contiguous.
+        """
+        return sum(end + 1 - start for start, end in self.offsets)
+
+    @property
+    def largest_chunk(self) -> int:
+        """
+        The largest chunk of contiguous amount data that is avaliable for
+        re-use.
+        """
+        if self.offsets:
+            return self.offsets[-1][1] - self.offsets[-1][0] + 1
+        return 0
+
+    @property
+    def smallest_chunk(self) -> int:
+        """
+        The smallest chunk of contiguous amount data that is avaliable for
+        re-use.
+        """
+        if self.offsets:
+            return self.offsets[0][1] - self.offsets[0][0] + 1
+        return 0
+
+    def free(self, start: int, end: int) -> None:
+        """
+        Add the given byte range to the space that can be reused later.
+        """
+        if start < 0:
+            raise ValueError("Start offset needs to positive.")
+        if end < start:
+            msg = "End offset needs to be larger than start offset."
+            raise ValueError(msg)
+
+        to_delete = []
+        for n, existing in enumerate(self.offsets):
+            existing_start, existing_end = existing
+
+            if start <= existing_end and existing_start <= end:
+                wanted = start, end
+                msg = f"Range {wanted!r} is overlapping with {existing!r}."
+                raise ValueError(msg)
+
+            # If the offsets follow or precede existing ranges, merge them.
+            if existing_end + 1 == start:
+                start = existing_start
+                to_delete.append(n)
+            if end + 1 == existing_start:
+                end = existing_end
+                to_delete.append(n)
+        for n in to_delete:
+            del self.offsets[n]
+
+        self.offsets.append((start, end))
+        self.offsets.sort(key=lambda o: o[1] - o[0])
+
+    def alloc(self, size: int) -> int:
+        """
+        Get `size` bytes of data for re-use, mark them as used and return its
+        offset.
+        """
+        if size < 1:
+            raise ValueError("Allocation size has to be at least 1 byte.")
+        for n, (start, end) in enumerate(self.offsets):
+            if end - start + 1 >= size:
+                new_start = start + size
+                self.offsets[n] = (new_start, end)
+                if new_start == end + 1:
+                    del self.offsets[n]
+                return start
+        if size == 1:
+            msg = f"Unable to find space for one byte in {self.offsets!r}."
+        else:
+            msg = f"Unable to find space for {{}} bytes in {self.offsets!r}."
+        raise LookupError(msg.format(size))
+
+    @contextmanager
+    def maybe_alloc(
+        self, size: int
+    ) -> Generator[Tuple[int, Callable[[int], None]], None, None]:
+        """
+        Allocate at least `size` bytes and pass the offset and a callback to
+        the context. The offset is the start address of the allocated bytes and
+        the callback accepts a single argument denoting how many bytes were
+        actually used. After leaving the context, the excess bytes are freed
+        back to the compost.
+        """
+        offset = self.alloc(size)
+        used = 0
+
+        def _mark_used(used_size: int):
+            nonlocal used
+            if used_size < 0 or used_size > size:
+                raise ValueError(f"Tried to mark {used_size} bytes as used,"
+                                 f" but only {size} bytes are available.")
+            used = used_size
+
+        try:
+            yield offset, _mark_used
+        finally:
+            if used < size:
+                start = offset + used
+                self.free(start, offset + size - 1)
+
+
+class Patcher(ContextManager['Patcher']):
+    compost: Compost
+
+    def __init__(self, filename: str):
+        self.filename = filename
+        self.__r2: Optional[R2Pipe] = None
+        self.__addr: Optional[int] = None
+        self.compost = Compost()
+
+    def __enter__(self) -> 'Patcher':
+        self.__r2 = R2Pipe(self.filename, ['-w'])
+        logging.info(f'Analysing program {self.filename}.')
+        self.raw_command('aa')
+        return self
+
+    def __exit__(self, *exc_details) -> None:
+        assert self.__r2 is not None
+        self.__r2.quit()
+
+    def raw_command(self, cmd: str) -> str:
+        assert self.__r2 is not None
+        return self.__r2.cmd(cmd)
+
+    def raw_command_json(self, cmd: str) -> Any:
+        assert self.__r2 is not None
+        return self.__r2.cmdj(cmd)
+
+    @property
+    def address(self) -> int:
+        if self.__addr is None:
+            self.__addr = self.raw_command_json('?vi $$')
+        return self.__addr
+
+    @address.setter
+    def address(self, addr: int) -> None:
+        self.raw_command(f's {addr}')
+        self.__addr = addr
+
+    def __invalidate_address(self):
+        self.__addr = None
+
+    @contextmanager
+    def at_address(self, addr: int) -> Generator[None, None, None]:
+        """
+        Seek to given address and restore position after leaving context.
+        """
+        oldaddr = self.address
+        self.address = addr
+        try:
+            yield
+        finally:
+            self.address = oldaddr
+
+    def get_current_opcode_size(self, count: int = 1) -> int:
+        """
+        Return the size in bytes for the next `count` opcodes.
+        """
+        return self.raw_command_json(f'aos {count}')
+
+    def get_assembled_size(self, *opcodes: str) -> int:
+        """
+        Return the size in bytes of the given opcodes. Note that JMP locations
+        may be position-dependent, so you have to seek to the address first if
+        you want this to be reliable.
+        """
+        result = self.raw_command(f'"wa* {";".join(opcodes)}"')
+        assert result.startswith('wx ')
+        return len(result[3:].strip()) >> 1
+
+    def write_code(self, *opcodes: str) -> int:
+        """
+        Write the specified opcodes and return the written length in bytes.
+        """
+        oldaddr = self.address
+        logging.info(f'Writing assembly of {opcodes!r} to address'
+                     f' 0x{oldaddr:x}.')
+        self.raw_command(f'"wa {";".join(opcodes)}"')
+        self.raw_command(f'so {len(opcodes)}')
+        self.__invalidate_address()
+        return self.address - oldaddr
+
+    def replace_code(self, *opcodes: str, count: int = 1) -> int:
+        """
+        Write the specified opcodes over the next `count` opcodes at the
+        current position with padding and return the written length in bytes.
+        """
+        current_size = self.get_current_opcode_size(count)
+        expected_size = self.get_assembled_size(*opcodes)
+        assert expected_size <= current_size
+        padsize = current_size - expected_size
+        nops = ['nop'] * padsize
+        logging.info(f'Replacing {count} opcodes ({current_size} bytes)'
+                     f' at address 0x{self.address} with assembly'
+                     f' {opcodes!r} (padded with {padsize} nops).')
+        written = self.write_code(*opcodes, *nops)
+        assert written == current_size, \
+            f"Wanted to replace {count} opcodes consisting of {current_size}" \
+            f" bytes, however we've written {written} bytes instead."
+        return written
+
+    def write_code_to_compost(self, *opcodes: str) -> int:
+        """
+        Write the specified opcodes to the `Compost` area and return the
+        offset.
+        """
+        estimated = self.get_assembled_size(*opcodes)
+        with self.compost.maybe_alloc(estimated + 20) as (offset, mark_used):
+            with self.at_address(offset):
+                logging.info(f'Writing assembly {opcodes!r} to compost at'
+                             f' address 0x{offset:x}.')
+                size = self.get_assembled_size(*opcodes)
+                written = self.write_code(*opcodes)
+                mark_used(written)
+                assert written == size, \
+                    f"Assembled size for {opcodes!r} is {size} bytes," \
+                    f" but the actual code size is {written} bytes."
+            return offset
+
+    def write_cstring(self, value: str) -> int:
+        """
+        Write the specified C-String and return the length in bytes (which
+        includes the NUL byte).
+        """
+        logging.info(f'Writing C-String {value!r} to address'
+                     f' 0x{self.address:x}.')
+        self.raw_command(f'w {value}\\0')
+        size = len(value) + 1
+        self.raw_command(f's+ {size}')
+        self.address += size
+        return size
+
+    def write_cstring_to_compost(self, value: str) -> int:
+        """
+        Write the specified C-String to the `Compost` area and return its
+        offset.
+        """
+        size = len(value) + 1
+        offset = self.compost.alloc(size)
+        with self.at_address(offset):
+            logging.info(f'Writing C-String {value!r} to compost at'
+                         f' address 0x{offset:x}.')
+            self.raw_command(f'w {value}\\0')
+        return offset
+
+    def read_cstring(self, addr: int) -> Dict[Any, Any]:
+        """
+        Read the C-String at the specified `addr` and return its information.
+        """
+        return self.raw_command_json(f'pszj @{addr}')
+
+    def load_function(self, name: str) -> Dict[Any, Any]:
+        self.raw_command(f'sf {name}')
+        fun = self.raw_command_json('pdfj')
+        assert fun['name'] == name
+        self.__invalidate_address()
+        return fun
+
+    def skip_opcodes(self, amount: int = 1) -> int:
+        "Skip the specified amount of opcodes and return the total length."
+        oldaddr = self.address
+        self.raw_command(f'so {amount}')
+        self.__invalidate_address()
+        return self.address - oldaddr
+
+
+def patch_get_executable_name(patcher: Patcher, out: Path) -> None:
+    # We'll use the get_executable_name() function to not only hardcode the
+    # executable path but also use the unused part for static data referenced
+    # by other functions.
+    fun = patcher.load_function('sym.get_executable_name')
+    offset = fun["ops"][0]["offset"]
+
+    # Let's just assume 32 bytes, which is enough to hopefully fit in the code.
+    executable_offset = offset + 32
+
+    # This makes sure that get_executable_name always returns the binary at
+    # $out/bin/factorio, since we don't need to do all kinds of shenanigans at
+    # runtime when using Nix.
+    size = patcher.write_code(
+        f'mov edi, {executable_offset}',
+        'call sym.al_create_path',
+        'ret',
+    )
+    assert size < 32, "Replacement code for get_executable_name " \
+                      f"would overflow into compost area ({size} >= 32)."
+
+    executable = str(out / 'bin' / 'factorio')
+    patcher.address = executable_offset
+    size = patcher.write_cstring(executable)
+
+    # TODO: End should be the *real* end, not the offset of the last opcode!
+    patcher.compost.free(executable_offset + size, fun["ops"][-1]["offset"])
+
+
+def patch_read_data_path(patcher: Patcher, out: Path) -> None:
+    data_path = str(out / 'share' / 'factorio')
+    data_path_offset = patcher.write_cstring_to_compost(data_path)
+
+    matches = patcher.raw_command_json('/vj str.usr_share_factorio')
+    assert len(matches) > 0, \
+        f'no matches found for /usr/share/factorio: {matches}'
+
+    for match in matches:
+        assert match['type'] == 'hexpair'
+        assert len(match['data']) == 8
+        patcher.raw_command(f'wv4 {data_path_offset} @{match["offset"]}')
+
+
+def patch_write_data_path(patcher: Patcher) -> None:
+    # This is the function which returns fs::path("$HOME/.factorio"), but since
+    # we want to conform to XDG, we need to rewrite that function.
+    fun = patcher.load_function('method.Paths.getSystemWriteData')
+
+    # The function in question uses getpwuid(getuid())->pw_dir, so let's find
+    # out the register that references the value of the home directory.
+    pwuid_call = 'call sym.imp.getpwuid'
+    rest = dropwhile(lambda i: i['disasm'] != pwuid_call, fun['ops'])
+    rest = dropwhile(lambda i: i['esil'].split(',')[1:2] != ['rax'], rest)
+    home_register = next(rest)['esil'].split(',')[4]
+
+    # Just to make sure we inspect this on the next upstream update if the
+    # register has changed.
+    assert home_register == 'r13'
+
+    # Get all the data references of the current function so that we can not
+    # only re-use them but also have a hint on what we might want to rewrite.
+    refs = {}
+    for n, op in enumerate(fun["ops"]):
+        for ref in op.get('refs', []):
+            if ref['type'] != 'DATA':
+                continue
+            refs[n] = patcher.read_cstring(ref['addr'])
+
+    # The disassembly of the operation we want to rewrite.
+    rewrite_pos_info = next(
+        fun['ops'][n] for n, i in refs.items() if i['string'] == '/.factorio'
+    )
+
+    # We want to make sure that RSI contains "/.factorio", since that's the
+    # register we want to rewrite.
+    assert rewrite_pos_info['esil'] == f'{rewrite_pos_info["ptr"]},rsi,='
+
+    # Seek to the rewrite position and memoize the address that comes directly
+    # afterwards, so that we can JMP back accordingly.
+    patcher.address = rewrite_pos_info["offset"]
+    patcher.skip_opcodes()
+    continuation_addr = patcher.address
+
+    # After we're done, we need to restore these registers to the state before
+    # we JMPed into our new code. We *only* restore registers for argument 1,
+    # 4, 5, 6, 7, 8 and the integer return register because the second argument
+    # is one that we actually *want* to override.
+    restore_registers = ['rdi', 'rcx', 'r8', 'r9', 'r10', 'r11', 'rax']
+
+    # Just to make sure we don't accidentally revert a register we write into.
+    assert home_register not in restore_registers
+    init_code = [f'push {reg}' for reg in restore_registers]
+    cleanup_code = [f'pop {reg}' for reg in reversed(restore_registers)]
+    cleanup_code.append(f'jmp {continuation_addr}')
+
+    xdg_fallback = '/.local/share/factorio'
+    xdg_fallback_offset = patcher.write_cstring_to_compost(xdg_fallback)
+
+    # This code is the handler for whenever no XDG_DATA_HOME is defined.
+    noxdg_code_offset = patcher.write_code_to_compost(
+        f'mov rsi, {xdg_fallback_offset}',
+        f'mov rdx, {xdg_fallback_offset + len(xdg_fallback)}',
+        *cleanup_code,
+    )
+
+    xdg_data_home_offset = patcher.write_cstring_to_compost('XDG_DATA_HOME')
+
+    factorio_subpath = '/factorio'
+    factorio_subpath_offset = \
+        patcher.write_cstring_to_compost(factorio_subpath)
+
+    xdg_code_offset = patcher.write_code_to_compost(
+        *init_code,
+
+        f'mov rdi, {xdg_data_home_offset}',
+        'call sym.imp.getenv',
+
+        # If getenv returns NULL, let's jump to the fallback code (using
+        # ~/.local/share) we've written previously.
+        'test rax, rax',
+        f'je {noxdg_code_offset}',
+
+        # Essentially override the home directory with XDG_DATA_HOME and
+        # replace "/.factorio" with "/factorio".
+        f'mov {home_register}, rax',
+        f'mov rsi, {factorio_subpath_offset}',
+        f'mov rdx, {factorio_subpath_offset + len(factorio_subpath)}',
+
+        *cleanup_code,
+    )
+
+    # Finally, let's write the initial JMP back to our handler code.
+    patcher.address = rewrite_pos_info["offset"]
+    patcher.write_code(f'jmp {xdg_code_offset}')
+
+
+if __name__ == '__main__':
+    logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
+
+    with Patcher(sys.argv[1]) as patcher:
+        out = Path(os.environ['out'])
+        patch_get_executable_name(patcher, out)
+        patch_read_data_path(patcher, out)
+        patch_write_data_path(patcher)
diff --git a/pkgs/games/gog/factorio/test_patch.py b/pkgs/games/gog/factorio/test_patch.py
new file mode 100644
index 00000000..7d5fad82
--- /dev/null
+++ b/pkgs/games/gog/factorio/test_patch.py
@@ -0,0 +1,210 @@
+import pytest  # type: ignore
+
+from io import StringIO
+from itertools import groupby
+from typing import Set, Tuple, Iterable, Callable, Any
+
+from hypothesis import assume, given  # type: ignore
+import hypothesis.strategies as st  # type: ignore
+
+from patch import Compost
+
+MAX_OFFSET = 1000
+ints = st.integers(min_value=0, max_value=MAX_OFFSET)
+
+
+@given(ranges=st.iterables(st.tuples(ints, ints)))
+def test_compost_free(ranges: Iterable[Tuple[int, int]]) -> None:
+    compost = Compost()
+
+    # Use a completely different (however inefficient) implementation here to
+    # check for overlaps so that we can make sure the actual implementation
+    # isn't "accidentally correct".
+    bitset: Set[int] = set()
+    for start, end in ranges:
+        if end < start:
+            with pytest.raises(ValueError):
+                compost.free(start, end)
+        else:
+            must_fail = False
+            new_bitset = set()
+            for bit in range(start, end + 1):
+                if bit in bitset:
+                    must_fail = True
+                new_bitset.add(bit)
+            if must_fail:
+                with pytest.raises(ValueError):
+                    compost.free(start, end)
+            else:
+                bitset |= new_bitset
+                compost.free(start, end)
+
+    # Allocating from the biggest chunks to the smallest ones needs to always
+    # work and will also make sure that contiguous areas are properly merged.
+    grouped = groupby(range(MAX_OFFSET + 1), bitset.__contains__)
+    chunks = [len(list(chunk)) for valid, chunk in grouped if valid]
+    for size in sorted(chunks, reverse=True):
+        compost.alloc(size)
+
+    # The last allocation is just one byte and it must fail, since we exhausted
+    # all the space we have on the compost heap.
+    with pytest.raises(LookupError):
+        compost.alloc(1)
+
+
+@st.composite
+def valid_ranges(
+    draw: Callable[..., Any],
+    min_chunksize: int = 1,
+    max_chunksize: int = 10000,
+    min_ranges: int = 0,
+    max_ranges: int = 100,
+    allow_gaps: bool = True,
+):
+    gap = st.tuples(st.just(False), st.integers(min_value=0))
+    occupied = st.tuples(st.just(True),
+                         st.integers(min_value=min_chunksize,
+                                     max_value=max_chunksize))
+    elems = st.one_of(gap, occupied) if allow_gaps else occupied
+    sizes = draw(st.iterables(elems, min_size=min_ranges, max_size=max_ranges))
+    ranges = []
+    offset = 0
+    for is_occupied, size in sizes:
+        if is_occupied:
+            ranges.append((offset, offset + size))
+            offset += 1
+        offset += size
+    return ranges
+
+
+@given(ranges=valid_ranges())
+def test_compost_sizes(ranges: Iterable[Tuple[int, int]]) -> None:
+    compost = Compost()
+    total = 0
+    smallest = 0
+    largest = 0
+    for start, end in ranges:
+        size = end - start + 1
+        if smallest == 0 or size < smallest:
+            smallest = size
+        if size > largest:
+            largest = size
+        total += size
+        compost.free(start, end)
+
+    assert compost.available == total
+
+    # Values could have been merged, so the sizes will be at *least* the value
+    # we calculated here in the naive way.
+    assert compost.smallest_chunk >= smallest
+    assert compost.largest_chunk >= largest
+
+
+@given(ranges=valid_ranges(allow_gaps=False, min_ranges=1))
+def test_compost_gapless_alloc(ranges: Iterable[Tuple[int, int]]) -> None:
+    compost = Compost()
+    for start, end in ranges:
+        compost.free(start, end)
+
+    assert compost.available > 0
+    compost.alloc(compost.available)
+    assert compost.available == 0
+
+
+@given(ranges=valid_ranges(max_chunksize=100, min_ranges=1, max_ranges=10))
+def test_compost_alloc_nofrag(ranges: Iterable[Tuple[int, int]]) -> None:
+    compost = Compost()
+    for start, end in ranges:
+        compost.free(start, end)
+
+    total = compost.available
+    for n in range(compost.available):
+        compost.alloc(1)
+        total -= 1
+        assert compost.available == total
+
+    with pytest.raises(LookupError):
+        compost.alloc(1)
+
+
+@given(ranges=valid_ranges(min_chunksize=4, max_chunksize=20, min_ranges=1),
+       allocs=st.iterables(st.integers(min_value=1, max_value=21), min_size=1))
+def test_compost_alloc(ranges: Iterable[Tuple[int, int]],
+                       allocs: Iterable[int]) -> None:
+    compost = Compost()
+    for start, end in ranges:
+        compost.free(start, end)
+
+    for alloc_size in allocs:
+        assume(compost.largest_chunk >= alloc_size)
+        size_before = compost.available
+        compost.alloc(alloc_size)
+        assert size_before - alloc_size == compost.available
+
+
+@given(ranges=valid_ranges(min_chunksize=2, max_chunksize=5,
+                           min_ranges=10, max_ranges=15,
+                           allow_gaps=False),
+       use=st.integers(min_value=-1, max_value=21))
+def test_compost_maybe_alloc(ranges: Iterable[Tuple[int, int]],
+                             use: int) -> None:
+    compost = Compost()
+    for start, end in ranges:
+        compost.free(start, end)
+
+    # This is similar to the one in test_compost_free and it serves as a canary
+    # to make sure we don't accidentally overwrite something we don't want.
+    bitset_before = {i for start, end in compost.offsets
+                     for i in range(1000) if start <= i <= end}
+
+    initial_available = compost.available
+    with compost.maybe_alloc(20) as (_, mark_used):
+        assert initial_available - compost.available == 20
+        assume(0 <= use <= 20)
+        mark_used(use)
+
+    # If the bitset before the allocation is a superset, something is clearly
+    # wrong and we're now in the middle of invalid memory.
+    bitset_afterwards = {i for start, end in compost.offsets
+                         for i in range(1000) if start <= i <= end}
+    assert bitset_before.issuperset(bitset_afterwards)
+
+    assert initial_available - use == compost.available
+
+
+@given(start=st.integers(min_value=0, max_value=100),
+       alloc_sizes=st.iterables(st.integers(min_value=1, max_value=20),
+                                max_size=5))
+def test_compost_writes(start: int, alloc_sizes: Iterable[int]):
+    compost = Compost()
+    compost.free(start, start + 100)
+
+    io = StringIO('.' * 200)
+    alloc_chars = 'abcdefghijkl'
+
+    expected_written = ''
+    for n, asize in enumerate(alloc_sizes):
+        char = alloc_chars[n]
+        offset = compost.alloc(asize)
+        io.seek(offset)
+        expected_written += char * asize
+        io.write(char * asize)
+
+    io.seek(0)
+    assert io.read().startswith('.' * start + expected_written)
+
+
+def test_invalid_compost_calls():
+    compost = Compost()
+
+    with pytest.raises(ValueError):
+        compost.free(10, 9)
+
+    with pytest.raises(ValueError):
+        compost.free(-10, 0)
+
+    with pytest.raises(ValueError):
+        compost.alloc(0)
+
+    with pytest.raises(ValueError):
+        compost.alloc(-10)