Generating embeddings for blog semantic search
The source of truth of the blog’s content is its markdown files. I read the blog locally as markdown files rendered by Neovim.
Hugo is used to build HTML files from the markdown. The HTML files are used to display the blog on the web. I haven’t yet decided if the content that’s used to generate embeddings should be extracted from the markdown or the HTML files. (Edit (Jan, 2026): markdown files are the source of truth, but the generated HTML files are being used as the source for the embeddings.)
The postchunker library
.
├── __init__.py # make it a library
├── postchunker.py # generate HTML fragments and text chunks
See Parsing HTML files with lxml and Learning about parsing XML and HTML with lxml for details about lxml and XML in general.
The code, uncommented for now. There was a bit of trial and error in the approach. It’s “good enough”, but I’ll look at it again soon.
# postchunker.py
from lxml import etree, html
from lxml.html import HtmlElement
__all__ = ["extract_sections"]
def serialize(fragment: HtmlElement, pretty_print: bool = False):
return html.tostring(
fragment,
pretty_print=pretty_print,
method="html",
encoding="unicode",
)
def heading_link(
original_heading: HtmlElement, headings_path: list[str], rel_path: str
):
href = f"/{rel_path}"
id = original_heading.attrib.get("id")
if id:
href = f"{href}#{id}"
anchor = etree.Element("a", {"href": href})
anchor.text = " > ".join(headings_path)
heading = etree.Element("h2")
heading.append(anchor)
return heading
def get_heading_level(tag: str) -> int:
heading_levels = {"h1": 0, "h2": 1, "h3": 2, "h4": 3, "h5": 4, "h6": 5}
return heading_levels[tag]
def exclude_element(element: HtmlElement) -> bool:
if element.get("class") == "footnotes":
return True
if element.get("class") == "terms":
return True
# do something better with this
if element.tag == "time":
return True
return False
# TODO: clean up; the element isn't an HtmlElement, it's an etree.Element?
def fix_relative_links(element: HtmlElement, rel_path: str):
for e in element.iter():
if e.tag == "a":
href = e.attrib["href"]
if href and href.startswith("#"):
e.attrib["href"] = f"/{rel_path}{href}"
return element
def has_text(element: HtmlElement) -> bool:
text = "".join(element.itertext()).strip()
if text:
return True
else:
return False
def section_texts(section: HtmlElement, headings_path: list[str]):
section_heading = " > ".join(headings_path) + ": "
section_heading_length = len(section_heading)
texts = []
for element in section.iterchildren():
if element.tag == "p":
text = "".join(element.itertext())
# remove unnecessary newline characters
text = " ".join(text.splitlines())
texts.append({"tag": "p", "text": text})
elif element.attrib.get("class") == "highlight":
code_element = element.find(".//code")
if code_element is not None:
lang = code_element.get("class")
code = f"({lang}):\n"
for line in code_element.iterchildren():
line_text = "".join(line.itertext())
code += line_text
if len(texts) and texts[-1].get("tag") == "p":
last_paragraph = texts[-1]["text"]
code = f"{last_paragraph}\n{code}"
texts[-1] = {"tag": "code", "text": code}
else:
texts.append({"tag": "code", "text": code})
sections = []
word_count = 0
current_section = ""
for entry in texts:
word_count += len(entry["text"].split(" "))
if (
word_count < (256 - section_heading_length) and entry["tag"] != "code"
): # 256
if current_section:
current_section += f"\n{entry['text']}"
else:
current_section = entry["text"]
else:
if current_section:
sections.append(section_heading + current_section)
current_section = entry["text"]
word_count = 0
sections.append(section_heading + current_section)
return sections
def extract_sections(filename: str, rel_path: str):
tree = html.parse(filename)
root = tree.find(".//article")
heading_tags = ("h1", "h2", "h3", "h4", "h5", "h6")
sections = []
current_heading = None
current_fragment = None
embeddings_text = []
headings_path = []
for child in root.iterchildren():
if child.tag in heading_tags:
if current_fragment is not None and has_text(current_fragment):
current_fragment = fix_relative_links(current_fragment, rel_path)
html_fragment = serialize(current_fragment, pretty_print=False)
html_heading = serialize(current_heading, pretty_print=False)
embeddings_text = section_texts(current_fragment, headings_path)
sections.append(
{
"html_fragment": html_fragment,
"html_heading": html_heading,
"headings_path": headings_path,
"embeddings_text": embeddings_text,
}
)
current_fragment = etree.Element("div", {"class": "article-fragment"})
heading_level = get_heading_level(child.tag)
headings_path = headings_path[:heading_level] + [child.text]
current_heading = heading_link(child, headings_path, rel_path)
elif current_fragment is not None:
if not exclude_element(child):
current_fragment.append(child)
if current_fragment is not None and has_text(current_fragment):
html_fragment = serialize(current_fragment, pretty_print=False)
html_heading = serialize(current_heading, pretty_print=False)
embeddings_text = section_texts(current_fragment, headings_path)
sections.append(
{
"html_fragment": html_fragment,
"html_heading": html_heading,
"headings_path": headings_path,
"embeddings_text": embeddings_text,
}
)
return sections
Generating embeddings
Read through the local Hugo content directories, find the associated index.html file for each
markdown file, and generate an embedding. Embeddings are saved locally, using the Chroma persistent
client.
# generate_embeddings.py
import frontmatter
import chromadb
from chromadb import Collection
import re
import unidecode
from postchunker import extract_sections
from pathlib import Path
postspath = "/home/scossar/zalgorithm/content"
# NOTES ##########################################################################################################
# see https://docs.trychroma.com/docs/embeddings/embedding-functions for details about custom embedding functions,
# i.e, creating one for for "all-mpnet-base-v2"
# #########################################################################################################
class EmbeddingGenerator:
def __init__(
self,
content_directory: str = "/home/scossar/zalgorithm/content",
html_directory: str = "/home/scossar/zalgorithm/public",
collection_name: str = "zalgorithm",
):
self.skip_dirs: set[str] = { # these are mostly wrong
"node_modules",
".git",
".obsidian",
"__pycache__",
"venv",
".venv",
}
self.collection_name = collection_name
self.chroma_client = chromadb.PersistentClient() # chroma will use the default `chroma` directory in the base of the project for persistence
self.collection = self.get_or_create_collection()
self.content_directory = content_directory
self.html_directory = html_directory
def get_or_create_collection(self) -> Collection:
return self.chroma_client.get_or_create_collection(name=self.collection_name)
# TODO: this is kind of chaotic; remember it's passed markdown files, not html files.
# either respect 'draft' frontmatter boolean, or a 'private' boolean;
def _should_process_file(self, filepath: Path) -> bool:
if any(part.startswith(".") for part in filepath.parts):
return False
if any(part.startswith("_") for part in filepath.parts):
return False
if any(skip_dir in filepath.parts for skip_dir in self.skip_dirs):
return False
if filepath.suffix.lower() not in (".md", ".markdown"):
return False
if filepath.name == "search.md":
return False
return True
# Hoping this matches Hugo's implementation
def _slugify(self, title: str) -> str:
title = unidecode.unidecode(title).lower()
title = re.sub(r"[^a-z0-9\s-]", "", title)
title = re.sub(r"[\s_]+", "-", title)
title = title.strip("-") # strip leading/trailing hyphens
return title
def _is_up_to_date(self, file_id: str, file_mtime: float) -> bool:
existing = self.collection.get(ids=file_id, limit=1)
if not existing["ids"] or not existing["metadatas"]:
return False
last_updated_at = existing["metadatas"][0].get("updated_at", 0)
if not isinstance(last_updated_at, (int, float)):
return False # if it's an invalid timestamp
return (
last_updated_at + 1.0 >= file_mtime
) # 1 second tolerance for rounding errors
def generate_embeddings(self):
"""
Generate embeddings for blog content
"""
for path in Path(self.content_directory).rglob("*"):
if not self._should_process_file(path):
continue
self.generate_embedding(path)
def get_file_paths(self, md_path: Path) -> tuple[str, str] | tuple[None, None]:
try:
rel_path = md_path.relative_to(self.content_directory)
except ValueError: # if md_path isn't a subpath of content_directory
print(
f"{md_path} isn't relative to the content directory ({self.content_directory})"
)
return None, None
rel_path_parts = rel_path.with_suffix("").parts
rel_path_parts = tuple(
s.lower() for s in rel_path_parts
) # it's possible to end up with an uppercase md filename
html_path = Path(self.html_directory) / Path(*rel_path_parts) / "index.html"
if html_path.exists():
return str(html_path), str(Path(*rel_path_parts))
else:
print(f"No file exists at {html_path}")
return None, None
def generate_embedding(self, filepath: Path) -> None:
html_path, relative_path = self.get_file_paths(filepath)
if not html_path or not relative_path:
return None
print(f"Processing {relative_path}")
post = frontmatter.load(str(filepath))
file_mtime = filepath.stat().st_mtime
title = str(post.get("title"))
post_id = post.get("id", None)
if not post_id:
print(
f"The post '{title}' is missing an 'id' field. Skipping generating an embedding."
)
return None
sections = extract_sections(html_path, relative_path)
for section in sections:
html_fragment = section["html_fragment"]
html_heading = section["html_heading"]
page_heading = section["headings_path"][0]
section_heading = section["headings_path"][-1]
section_heading_slug = self._slugify(section_heading)
embeddings_text = section["embeddings_text"]
for index, text in enumerate(embeddings_text):
embedding_id = f"{post_id}-{index}-{section_heading_slug}"
# TODO: (maybe) uncomment after testing
# if self._is_up_to_date(embedding_id, file_mtime):
# print(f"Not indexing {title}. Up to date.")
# return None
metadatas = {
"page_title": page_heading,
"section_heading": section_heading,
"html_heading": html_heading,
"html_fragment": html_fragment,
"updated_at": file_mtime,
}
self.collection.upsert(
ids=embedding_id, metadatas=metadatas, documents=text
)
# for testing
def query_collection(self, query: str):
results = self.collection.query(
query_texts=[query],
n_results=7,
include=["metadatas", "documents", "distances"],
)
if not (results["metadatas"] and results["documents"] and results["distances"]):
return
ids = results["ids"][0]
documents = results["documents"][0]
metadatas = results["metadatas"][0]
distances = results["distances"][0]
zipped = zip(ids, documents, metadatas, distances)
for _, document, metadata, distance in zipped:
print("\n", metadata)
print(distance, "\n")
# test_path = "/home/scossar/zalgorithm/content/notes/a-simple-document-for-testing.md"
# test_path = "/home/scossar/zalgorithm/content/notes/roger-bacon-as-magician.md"
# test_path = "/home/scossar/zalgorithm/content/notes/notes-on-cognitive-and-morphological-patterns.md"
embeddings_generator = EmbeddingGenerator()
# embeddings_generator.generate_embedding(Path(test_path))
embeddings_generator.generate_embeddings()