A lightweight Go service that watches directories and streams completed files into Apache Pulsar topics. Designed for reliable file-to-event pipelines at the edge or in data ingestion systems.
| .gitignore | ||
| go.mod | ||
| go.sum | ||
| LICENSE | ||
| main.go | ||
| Makefile | ||
| README.md | ||
🪶 filecast
filecast is a lightweight Go daemon that watches one or more directories for new or modified files and streams completed files into Apache Pulsar topics. It’s designed for high-throughput ingest pipelines, data lakes, and edge systems where file-based workflows need to be published as event streams.
✨ Features
- 🕵️ Directory watcher using inotify / kqueue (
github.com/rjeczalik/notify) - 🔄 Recursive mode for nested directory trees
- 📡 Direct Pulsar producer per file, with optional chunked messages for large payloads
- 🔐 JWT / token authentication via environment variables
- 🧾 Message properties automatically include file path, size, hash, and name
- 💾 xattr tagging (
user.publishedanduser.pulsar-final-msgid) after successful publication - ⚙️ Configurable base topic per namespace and tenant
- 🧩 Command-line interface built with Cobra
- 🪣 Stateless by design, suitable for container deployment or batch-processing
🚀 Quick Start
1. Build
go build -o filecast .
2. Set up environment
export PULSAR_URL=pulsar://localhost:6650
export PULSAR_JWT="your-jwt-token" # optional if authentication is enabled
3. Run the watcher
./filecast run --watch /data/incoming --recursive --tenant public --namespace default --topic-base files --property env=prod --property source=edge01
Whenever a file is closed after writing or moved into the watched directory,
filecast will:
- Compute its SHA-256 checksum
- Publish the file content as a binary payload to Pulsar
- Set extended attributes:
user.published=trueuser.pulsar-final-msgid=<pulsar message id>
🧠 Topic Naming
Each file is published to its own topic:
persistent://<tenant>/<namespace>/<topic-base>/<filename>.stream
Example:
persistent://public/default/files/report_2025-11-10.csv.stream
⚙️ Options
| Flag | Description |
|---|---|
--watch, -w |
One or more directories to watch (repeatable) |
--recursive |
Watch directories recursively |
--tenant |
Pulsar tenant (default: public) |
--namespace |
Pulsar namespace (default: default) |
--topic-base |
Base topic prefix (default: files) |
--enable-chunking, -c |
Enable Pulsar native message chunking |
--xattr |
Set extended attributes on success (default: true) |
--property, -p |
Custom message property in key=value format (repeatable) |
🧩 Message Properties
Each published Pulsar message automatically includes:
| Property | Example | Description |
|---|---|---|
file.path |
/data/incoming/report.csv |
Full file path |
file.name |
report.csv |
Base file name |
file.size |
24576 |
File size in bytes |
file.sha256 |
4a8b6e… |
SHA-256 checksum |
stream.version |
1 |
Version marker |
user.* |
— | Any custom --property values |
🔐 Authentication
If PULSAR_JWT is set, the client uses token authentication:
export PULSAR_JWT=$(cat ~/.pulsar/token.jwt)
Otherwise, it connects anonymously to the configured PULSAR_URL.
🪶 Example Logs
INFO[0001] Watching recursively: /data/incoming
INFO[0003] published file
topic=persistent://public/default/files/report.csv.stream
nativeChunking=false
finalMsgID="000001-0001-00000001"
🧱 Extended Attributes
After successful publish:
getfattr -d /data/incoming/report.csv
# file: report.csv
user.published="true"
user.pulsar-final-msgid="000001-0001-00000001"
These xattrs can be used by other processes to detect upload status.
🧰 Version
./filecast version
dev
📦 Dependencies
🧩 License
MIT License © 2025 Matthias Petermann