Skip to content

Agent Reads and Writes

This example walks through the core SDK workflow: creating a repo, writing files, reading them back with metadata, and using transactions with expected_head for safe concurrent updates.

import json
import os
from githosted import Client, StaleHeadError, is_stale_head_error
client = Client(token=os.environ.get("GITHOSTED_TOKEN"))
# 1. Create a new repo
repo = client.create_repo("agent-workspace")
print(f"Created repo: {repo.info.slug} ({repo.id})")
# 2. Write a config file
result = repo.write(
"config.json",
json.dumps({"model": "claude-3", "temperature": 0.7}, indent=2),
"Initialize config",
)
print(f"Commit: {result.commit_sha}")
# 3. Read the file back — returns content + metadata for concurrency control
file = repo.read("config.json")
print(f"Content: {file.content}")
print(f"Head SHA: {file.head_sha}")
print(f"Blob SHA: {file.blob_sha}")
# 4. Write again with expected_head for optimistic concurrency.
# If another writer committed between our read and this write,
# the server rejects with StaleHeadError instead of silently overwriting.
config = json.loads(file.content)
config["temperature"] = 0.5
repo.write(
"config.json",
json.dumps(config, indent=2),
"Lower temperature",
expected_head=file.head_sha,
)
# 5. Use a transaction to write multiple files atomically
with repo.transaction("Add prompt and context files") as tx:
tx.write("prompts/system.txt", "You are a helpful assistant.")
tx.write("prompts/user.txt", "Summarize the following document.")
tx.write(
"context/metadata.json",
json.dumps({"created_at": "2025-01-15T10:30:00Z"}),
)
# 6. List files to verify
root_entries = repo.ls()
print("Root:", root_entries)
# [FileEntry(name='config.json', type='file'),
# FileEntry(name='context', type='directory'),
# FileEntry(name='prompts', type='directory')]
prompt_entries = repo.ls("prompts")
print("Prompts:", prompt_entries)
# [FileEntry(name='system.txt', type='file'),
# FileEntry(name='user.txt', type='file')]
# 7. View the commit log
commits = repo.log(limit=5)
for c in commits:
print(f"{c.hash[:7]} {c.subject} ({c.author_name})")
# 8. Diff between two commits
if len(commits) >= 2:
diff = repo.diff(commits[1].hash, commits[0].hash)
print(diff.patch)

When multiple agents or processes write to the same repo, expected_head prevents lost updates. If the branch has moved, the SDK raises StaleHeadError with the actual head SHA so you can re-read and retry:

def safe_update(repo, path: str, transform):
"""Read-modify-write loop with concurrency safety."""
for attempt in range(3):
file = repo.read(path)
updated = transform(file.content)
try:
return repo.write(
path,
updated,
f"Update {path}",
expected_head=file.head_sha,
)
except StaleHeadError as err:
print(f"Stale head (attempt {attempt + 1}), re-reading...")
print(f"Expected: {err.expected_head}, actual: {err.actual_head}")
continue # re-read and retry
raise RuntimeError(f"Failed to update {path} after 3 attempts")

Transactions also accept expected_head to guard the entire batch of changes:

import json
file = repo.read("state.json")
state = json.loads(file.content)
with repo.transaction(
"Update state and log",
expected_head=file.head_sha,
) as tx:
state["step"] = state["step"] + 1
tx.write("state.json", json.dumps(state))
tx.write(f"logs/step-{state['step']}.txt", "Completed step")
tx.delete(f"logs/step-{state['step'] - 10}.txt")
  • repo.read() returns head_sha — pass it as expected_head on the next write to enable optimistic concurrency.
  • StaleHeadError is never auto-retried — you must handle it explicitly, since the SDK cannot know how to merge your intent with the intervening changes.
  • RepoBusyError is auto-retried — the SDK retries with exponential backoff (up to 3 times by default) when the repo is temporarily locked by another in-progress write.
  • Transactions commit multiple file changes under a single commit message. They accept the same ref and expected_head options as individual writes.