Zalgorithm

Generating embeddings for blog semantic search

The source of truth of the blog’s content is its markdown files. I read the blog locally as markdown files rendered by Neovim.

Hugo is used to build HTML files from the markdown. The HTML files are used to display the blog on the web. I haven’t yet decided if the content that’s used to generate embeddings should be extracted from the markdown or the HTML files. (Edit (Jan, 2026): markdown files are the source of truth, but the generated HTML files are being used as the source for the embeddings.)

The postchunker library

.
├── __init__.py  # make it a library
├── postchunker.py  # generate HTML fragments and text chunks

See Parsing HTML files with lxml and Learning about parsing XML and HTML with lxml for details about lxml and XML in general.

The code, uncommented for now. There was a bit of trial and error in the approach. It’s “good enough”, but I’ll look at it again soon.

# postchunker.py

from lxml import etree, html
from lxml.html import HtmlElement

__all__ = ["extract_sections"]


def serialize(fragment: HtmlElement, pretty_print: bool = False):
    return html.tostring(
        fragment,
        pretty_print=pretty_print,
        method="html",
        encoding="unicode",
    )


def heading_link(
    original_heading: HtmlElement, headings_path: list[str], rel_path: str
):
    href = f"/{rel_path}"
    id = original_heading.attrib.get("id")
    if id:
        href = f"{href}#{id}"

    anchor = etree.Element("a", {"href": href})
    anchor.text = " > ".join(headings_path)
    heading = etree.Element("h2")
    heading.append(anchor)

    return heading


def get_heading_level(tag: str) -> int:
    heading_levels = {"h1": 0, "h2": 1, "h3": 2, "h4": 3, "h5": 4, "h6": 5}
    return heading_levels[tag]


def exclude_element(element: HtmlElement) -> bool:
    if element.get("class") == "footnotes":
        return True

    if element.get("class") == "terms":
        return True

    # do something better with this
    if element.tag == "time":
        return True

    return False


# TODO: clean up; the element isn't an HtmlElement, it's an etree.Element?
def fix_relative_links(element: HtmlElement, rel_path: str):
    for e in element.iter():
        if e.tag == "a":
            href = e.attrib["href"]
            if href and href.startswith("#"):
                e.attrib["href"] = f"/{rel_path}{href}"

    return element


def has_text(element: HtmlElement) -> bool:
    text = "".join(element.itertext()).strip()
    if text:
        return True
    else:
        return False


def section_texts(section: HtmlElement, headings_path: list[str]):
    section_heading = " > ".join(headings_path) + ": "
    section_heading_length = len(section_heading)
    texts = []
    for element in section.iterchildren():
        if element.tag == "p":
            text = "".join(element.itertext())
            # remove unnecessary newline characters
            text = " ".join(text.splitlines())
            texts.append({"tag": "p", "text": text})
        elif element.attrib.get("class") == "highlight":
            code_element = element.find(".//code")
            if code_element is not None:
                lang = code_element.get("class")
                code = f"({lang}):\n"
                for line in code_element.iterchildren():
                    line_text = "".join(line.itertext())
                    code += line_text

                if len(texts) and texts[-1].get("tag") == "p":
                    last_paragraph = texts[-1]["text"]
                    code = f"{last_paragraph}\n{code}"
                    texts[-1] = {"tag": "code", "text": code}
                else:
                    texts.append({"tag": "code", "text": code})

    sections = []
    word_count = 0
    current_section = ""
    for entry in texts:
        word_count += len(entry["text"].split(" "))
        if (
            word_count < (256 - section_heading_length) and entry["tag"] != "code"
        ):  # 256
            if current_section:
                current_section += f"\n{entry['text']}"
            else:
                current_section = entry["text"]

        else:
            if current_section:
                sections.append(section_heading + current_section)
            current_section = entry["text"]
            word_count = 0

    sections.append(section_heading + current_section)

    return sections


def extract_sections(filename: str, rel_path: str):
    tree = html.parse(filename)
    root = tree.find(".//article")
    heading_tags = ("h1", "h2", "h3", "h4", "h5", "h6")
    sections = []
    current_heading = None
    current_fragment = None
    embeddings_text = []
    headings_path = []

    for child in root.iterchildren():
        if child.tag in heading_tags:
            if current_fragment is not None and has_text(current_fragment):
                current_fragment = fix_relative_links(current_fragment, rel_path)
                html_fragment = serialize(current_fragment, pretty_print=False)
                html_heading = serialize(current_heading, pretty_print=False)
                embeddings_text = section_texts(current_fragment, headings_path)
                sections.append(
                    {
                        "html_fragment": html_fragment,
                        "html_heading": html_heading,
                        "headings_path": headings_path,
                        "embeddings_text": embeddings_text,
                    }
                )

            current_fragment = etree.Element("div", {"class": "article-fragment"})
            heading_level = get_heading_level(child.tag)
            headings_path = headings_path[:heading_level] + [child.text]
            current_heading = heading_link(child, headings_path, rel_path)

        elif current_fragment is not None:
            if not exclude_element(child):
                current_fragment.append(child)

    if current_fragment is not None and has_text(current_fragment):
        html_fragment = serialize(current_fragment, pretty_print=False)
        html_heading = serialize(current_heading, pretty_print=False)
        embeddings_text = section_texts(current_fragment, headings_path)
        sections.append(
            {
                "html_fragment": html_fragment,
                "html_heading": html_heading,
                "headings_path": headings_path,
                "embeddings_text": embeddings_text,
            }
        )
    return sections

Generating embeddings

Read through the local Hugo content directories, find the associated index.html file for each markdown file, and generate an embedding. Embeddings are saved locally, using the Chroma persistent client.

#  generate_embeddings.py

import frontmatter
import chromadb
from chromadb import Collection
import re
import unidecode

from postchunker import extract_sections
from pathlib import Path

postspath = "/home/scossar/zalgorithm/content"

# NOTES ##########################################################################################################
# see https://docs.trychroma.com/docs/embeddings/embedding-functions for details about custom embedding functions,
# i.e, creating one for for "all-mpnet-base-v2"
# #########################################################################################################


class EmbeddingGenerator:
    def __init__(
        self,
        content_directory: str = "/home/scossar/zalgorithm/content",
        html_directory: str = "/home/scossar/zalgorithm/public",
        collection_name: str = "zalgorithm",
    ):
        self.skip_dirs: set[str] = {  # these are mostly wrong
            "node_modules",
            ".git",
            ".obsidian",
            "__pycache__",
            "venv",
            ".venv",
        }
        self.collection_name = collection_name
        self.chroma_client = chromadb.PersistentClient()  # chroma will use the default `chroma` directory in the base of the project for persistence
        self.collection = self.get_or_create_collection()
        self.content_directory = content_directory
        self.html_directory = html_directory

    def get_or_create_collection(self) -> Collection:
        return self.chroma_client.get_or_create_collection(name=self.collection_name)

    # TODO: this is kind of chaotic; remember it's passed markdown files, not html files.
    # either respect 'draft' frontmatter boolean, or a 'private' boolean;
    def _should_process_file(self, filepath: Path) -> bool:
        if any(part.startswith(".") for part in filepath.parts):
            return False
        if any(part.startswith("_") for part in filepath.parts):
            return False
        if any(skip_dir in filepath.parts for skip_dir in self.skip_dirs):
            return False
        if filepath.suffix.lower() not in (".md", ".markdown"):
            return False
        if filepath.name == "search.md":
            return False
        return True

    # Hoping this matches Hugo's implementation
    def _slugify(self, title: str) -> str:
        title = unidecode.unidecode(title).lower()
        title = re.sub(r"[^a-z0-9\s-]", "", title)
        title = re.sub(r"[\s_]+", "-", title)
        title = title.strip("-")  # strip leading/trailing hyphens
        return title

    def _is_up_to_date(self, file_id: str, file_mtime: float) -> bool:
        existing = self.collection.get(ids=file_id, limit=1)

        if not existing["ids"] or not existing["metadatas"]:
            return False

        last_updated_at = existing["metadatas"][0].get("updated_at", 0)

        if not isinstance(last_updated_at, (int, float)):
            return False  # if it's an invalid timestamp

        return (
            last_updated_at + 1.0 >= file_mtime
        )  # 1 second tolerance for rounding errors

    def generate_embeddings(self):
        """
        Generate embeddings for blog content
        """
        for path in Path(self.content_directory).rglob("*"):
            if not self._should_process_file(path):
                continue
            self.generate_embedding(path)

    def get_file_paths(self, md_path: Path) -> tuple[str, str] | tuple[None, None]:
        try:
            rel_path = md_path.relative_to(self.content_directory)
        except ValueError:  # if md_path isn't a subpath of content_directory
            print(
                f"{md_path} isn't relative to the content directory ({self.content_directory})"
            )
            return None, None

        rel_path_parts = rel_path.with_suffix("").parts
        rel_path_parts = tuple(
            s.lower() for s in rel_path_parts
        )  # it's possible to end up with an uppercase md filename
        html_path = Path(self.html_directory) / Path(*rel_path_parts) / "index.html"

        if html_path.exists():
            return str(html_path), str(Path(*rel_path_parts))
        else:
            print(f"No file exists at {html_path}")
            return None, None

    def generate_embedding(self, filepath: Path) -> None:
        html_path, relative_path = self.get_file_paths(filepath)
        if not html_path or not relative_path:
            return None

        print(f"Processing {relative_path}")

        post = frontmatter.load(str(filepath))
        file_mtime = filepath.stat().st_mtime
        title = str(post.get("title"))
        post_id = post.get("id", None)
        if not post_id:
            print(
                f"The post '{title}' is missing an 'id' field. Skipping generating an embedding."
            )
            return None

        sections = extract_sections(html_path, relative_path)

        for section in sections:
            html_fragment = section["html_fragment"]
            html_heading = section["html_heading"]
            page_heading = section["headings_path"][0]
            section_heading = section["headings_path"][-1]
            section_heading_slug = self._slugify(section_heading)
            embeddings_text = section["embeddings_text"]

            for index, text in enumerate(embeddings_text):
                embedding_id = f"{post_id}-{index}-{section_heading_slug}"
                # TODO:  (maybe) uncomment after testing
                # if self._is_up_to_date(embedding_id, file_mtime):
                #     print(f"Not indexing {title}. Up to date.")
                #     return None

                metadatas = {
                    "page_title": page_heading,
                    "section_heading": section_heading,
                    "html_heading": html_heading,
                    "html_fragment": html_fragment,
                    "updated_at": file_mtime,
                }

                self.collection.upsert(
                    ids=embedding_id, metadatas=metadatas, documents=text
                )

    # for testing
    def query_collection(self, query: str):
        results = self.collection.query(
            query_texts=[query],
            n_results=7,
            include=["metadatas", "documents", "distances"],
        )

        if not (results["metadatas"] and results["documents"] and results["distances"]):
            return

        ids = results["ids"][0]
        documents = results["documents"][0]
        metadatas = results["metadatas"][0]
        distances = results["distances"][0]

        zipped = zip(ids, documents, metadatas, distances)

        for _, document, metadata, distance in zipped:
            print("\n", metadata)
            print(distance, "\n")


# test_path = "/home/scossar/zalgorithm/content/notes/a-simple-document-for-testing.md"
# test_path = "/home/scossar/zalgorithm/content/notes/roger-bacon-as-magician.md"
# test_path = "/home/scossar/zalgorithm/content/notes/notes-on-cognitive-and-morphological-patterns.md"
embeddings_generator = EmbeddingGenerator()
# embeddings_generator.generate_embedding(Path(test_path))
embeddings_generator.generate_embeddings()