No description
Find a file
2025-12-23 10:20:18 +01:00
generic Added generic copy router 2025-12-23 10:20:18 +01:00
order Import poc 2025-12-20 13:07:03 +01:00
.gitignore Import poc 2025-12-20 13:07:03 +01:00
LICENSE Initial commit 2025-12-20 12:05:44 +00:00
pom.xml Added generic copy router 2025-12-23 10:20:18 +01:00
README.md Update README 2025-12-20 13:56:08 +01:00

Pulsar Stream Functions

This repository contains examples and experiments with Pulsar Functions for processing, filtering, and governing data streams in Apache Pulsar.


Bill of Materials

A local Pulsar standalone instance, running via Podman:

podman run -it --rm \
  -p 6650:6650 \
  -p 8080:8080 \
  docker.io/apachepulsar/pulsar:4.0.8 \
  bin/pulsar standalone

Tooling

Pulsar CLI

Repository:
https://forge.ext.d2ux.net/OpenLab/pulsar-cli

Consume messages:

export PULSAR_URL=pulsar://127.0.0.1:6650
pulsar-cli consumer -t 'persistent://public/default/test' -s debug

Produce messages:

export PULSAR_URL=pulsar://127.0.0.1:6650
echo "Hello Pulsar" | pulsar-cli producer -t 'persistent://public/default/test'

Note:
Use 127.0.0.1 explicitly. On some systems (e.g. Debian), localhost resolves to IPv6, which may not be reachable from the Podman container.


Pulsar Shell

Download and unpack:

curl -LO "https://www.apache.org/dyn/closer.lua/pulsar/pulsar-4.1.2/apache-pulsar-shell-4.1.2-bin.tar.gz?action=download"
tar xzvf apache-pulsar-shell-4.1.2-bin.tar.gz
cd apache-pulsar-shell-4.1.2/

Tip:
Add ~/bin to your PATH, unpack Pulsar Shell to
~/Apps/apache-pulsar-shell-4.1.2, and create a symlink:

ln -s ~/Apps/apache-pulsar-shell-4.1.2/bin/pulsar-shell ~/bin/pulsar-shell

Start the shell:

pulsar-shell

Quick test:

default(localhost)> admin tenants list
public
pulsar

Function Development

Coding

Example function: filtering order messages, forwarding only those containing the keyword "digital".

package net.d2ux.stream.order.filter;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public final class DigitalOrderFilter implements Function<String, String> {

    private static final String KEYWORD = "digital";

    @Override
    public String process(String input, Context context) {

        if (input == null || input.isBlank()) {
            context.getLogger().debug("Skipping empty message");
            return null;
        }

        if (input.toLowerCase().contains(KEYWORD)) {
            context.getLogger().info("Digital order detected");
            return input;
        }

        return null;
    }
}

Build

mvn install

Hot Deployment

admin functions create \
  --tenant public \
  --namespace default \
  --name order.filter.digital \
  --jar /absolute/path/to/filter-order-digital-1.0.0.jar \
  --classname net.d2ux.stream.order.filter.DigitalOrderFilter \
  --inputs persistent://public/default/orders.raw \
  --output persistent://public/default/orders.filtered.digital

Important: Pulsar Shell requires an absolute path to the JAR.


Testing

export PULSAR_URL=pulsar://127.0.0.1:6650
pulsar-cli consumer -t 'persistent://public/default/orders.filtered.digital' -s debug
echo "Bestellung analog"  | pulsar-cli producer -t 'persistent://public/default/orders.raw'
echo "Bestellung digital" | pulsar-cli producer -t 'persistent://public/default/orders.raw'

Only messages containing "digital" should appear in the filtered topic.


Update Function

admin functions update \
  --tenant public \
  --namespace default \
  --name order.filter.digital \
  --jar /absolute/path/to/filter-order-digital-1.0.1.jar \
  --classname net.d2ux.stream.order.filter.DigitalOrderFilter \
  --inputs persistent://public/default/orders.raw \
  --output persistent://public/default/orders.filtered.digital

Uninstall Function

Delete the function:

admin functions delete \
  --tenant public \
  --namespace default \
  --name order.filter.digital

Remove the deployed package:

admin packages delete \
  function://public/default/order.filter.digital@0

Scope

This repository is intended as:

  • a hands-on reference for Pulsar Functions
  • a sandbox for stream-level governance ideas
  • a starting point for production-grade stream logic