| generic | ||
| order | ||
| .gitignore | ||
| LICENSE | ||
| pom.xml | ||
| README.md | ||
Pulsar Stream Functions
This repository contains examples and experiments with Pulsar Functions for processing, filtering, and governing data streams in Apache Pulsar.
Bill of Materials
A local Pulsar standalone instance, running via Podman:
podman run -it --rm \
-p 6650:6650 \
-p 8080:8080 \
docker.io/apachepulsar/pulsar:4.0.8 \
bin/pulsar standalone
Tooling
Pulsar CLI
Repository:
https://forge.ext.d2ux.net/OpenLab/pulsar-cli
Consume messages:
export PULSAR_URL=pulsar://127.0.0.1:6650
pulsar-cli consumer -t 'persistent://public/default/test' -s debug
Produce messages:
export PULSAR_URL=pulsar://127.0.0.1:6650
echo "Hello Pulsar" | pulsar-cli producer -t 'persistent://public/default/test'
Note:
Use 127.0.0.1 explicitly. On some systems (e.g. Debian), localhost
resolves to IPv6, which may not be reachable from the Podman container.
Pulsar Shell
Download and unpack:
curl -LO "https://www.apache.org/dyn/closer.lua/pulsar/pulsar-4.1.2/apache-pulsar-shell-4.1.2-bin.tar.gz?action=download"
tar xzvf apache-pulsar-shell-4.1.2-bin.tar.gz
cd apache-pulsar-shell-4.1.2/
Tip:
Add ~/bin to your PATH, unpack Pulsar Shell to
~/Apps/apache-pulsar-shell-4.1.2, and create a symlink:
ln -s ~/Apps/apache-pulsar-shell-4.1.2/bin/pulsar-shell ~/bin/pulsar-shell
Start the shell:
pulsar-shell
Quick test:
default(localhost)> admin tenants list
public
pulsar
Function Development
Coding
Example function: filtering order messages, forwarding only those
containing the keyword "digital".
package net.d2ux.stream.order.filter;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
public final class DigitalOrderFilter implements Function<String, String> {
private static final String KEYWORD = "digital";
@Override
public String process(String input, Context context) {
if (input == null || input.isBlank()) {
context.getLogger().debug("Skipping empty message");
return null;
}
if (input.toLowerCase().contains(KEYWORD)) {
context.getLogger().info("Digital order detected");
return input;
}
return null;
}
}
Build
mvn install
Hot Deployment
admin functions create \
--tenant public \
--namespace default \
--name order.filter.digital \
--jar /absolute/path/to/filter-order-digital-1.0.0.jar \
--classname net.d2ux.stream.order.filter.DigitalOrderFilter \
--inputs persistent://public/default/orders.raw \
--output persistent://public/default/orders.filtered.digital
Important: Pulsar Shell requires an absolute path to the JAR.
Testing
export PULSAR_URL=pulsar://127.0.0.1:6650
pulsar-cli consumer -t 'persistent://public/default/orders.filtered.digital' -s debug
echo "Bestellung analog" | pulsar-cli producer -t 'persistent://public/default/orders.raw'
echo "Bestellung digital" | pulsar-cli producer -t 'persistent://public/default/orders.raw'
Only messages containing "digital" should appear in the filtered topic.
Update Function
admin functions update \
--tenant public \
--namespace default \
--name order.filter.digital \
--jar /absolute/path/to/filter-order-digital-1.0.1.jar \
--classname net.d2ux.stream.order.filter.DigitalOrderFilter \
--inputs persistent://public/default/orders.raw \
--output persistent://public/default/orders.filtered.digital
Uninstall Function
Delete the function:
admin functions delete \
--tenant public \
--namespace default \
--name order.filter.digital
Remove the deployed package:
admin packages delete \
function://public/default/order.filter.digital@0
Scope
This repository is intended as:
- a hands-on reference for Pulsar Functions
- a sandbox for stream-level governance ideas
- a starting point for production-grade stream logic