Step 3 of 8

Part 2: Outbox Extension (swim-outbox-kafka-ffice)

The consumer needs an outbox router to classify processed events and send them to the correct Kafka topic. This is Extension Point EP3 (the outbox router SPI).

You cloned swim-developer-extensions during Setup. Navigate to that directory.

2.1 Create the module

Inside the swim-developer-extensions repository, create the directory structure:

swim-outbox-kafka-ffice/
  pom.xml
  src/main/java/com/github/swim_developer/extension/outbox/kafka/ffice/
    FficeEventCategory.java
    FficeMessageClassifier.java
    FficeKafkaOutboxRouter.java

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.github.swim-developer</groupId>
        <artifactId>swim-extensions</artifactId>
        <version>1.0.0-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>

    <artifactId>swim-outbox-kafka-ffice</artifactId>
    <name>SWIM Extension - Outbox Kafka FF-ICE</name>
    <description>SwimOutboxRouter implementation that dispatches FF-ICE outbox events to Kafka topics by message type.</description>

    <dependencies>
        <dependency>
            <groupId>com.github.swim-developer</groupId>
            <artifactId>swim-framework-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.quarkus</groupId>
            <artifactId>quarkus-messaging-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>io.smallrye</groupId>
                <artifactId>jandex-maven-plugin</artifactId>
                <version>3.5.3</version>
                <executions>
                    <execution>
                        <id>make-index</id>
                        <goals><goal>jandex</goal></goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

FficeEventCategory.java

package com.github.swim_developer.extension.outbox.kafka.ffice;

public enum FficeEventCategory {
    FLIGHT_PLAN,
    FLIGHT_UPDATE,
    OPERATIONS,
    TRIAL,
    SUBMISSION,
    DATA,
    UNKNOWN
}

FficeMessageClassifier.java

Classifies an XML payload by inspecting the <ffice:type> element value:

XML type value Category
FILED_FLIGHT_PLAN, PRELIMINARY_FLIGHT_PLAN FLIGHT_PLAN
FLIGHT_PLAN_UPDATE, PLANNING_STATUS, FILING_STATUS FLIGHT_UPDATE
FLIGHT_DEPARTURE, FLIGHT_ARRIVAL, FLIGHT_CANCELLATION OPERATIONS
TRIAL_REQUEST, TRIAL_RESPONSE TRIAL
SUBMISSION_RESPONSE SUBMISSION
FLIGHT_DATA_REQUEST, FLIGHT_DATA_RESPONSE DATA
anything else UNKNOWN
package com.github.swim_developer.extension.outbox.kafka.ffice;

public final class FficeMessageClassifier {

    private static final String UNKNOWN_VALUE = "unknown";

    private FficeMessageClassifier() {
    }

    public static FficeEventCategory classify(String xml) {
        if (containsMessageType(xml, "FILED_FLIGHT_PLAN") || containsMessageType(xml, "PRELIMINARY_FLIGHT_PLAN")) {
            return FficeEventCategory.FLIGHT_PLAN;
        }
        if (containsMessageType(xml, "FLIGHT_PLAN_UPDATE") || containsMessageType(xml, "PLANNING_STATUS")
                || containsMessageType(xml, "FILING_STATUS")) {
            return FficeEventCategory.FLIGHT_UPDATE;
        }
        if (containsMessageType(xml, "FLIGHT_DEPARTURE") || containsMessageType(xml, "FLIGHT_ARRIVAL")
                || containsMessageType(xml, "FLIGHT_CANCELLATION")) {
            return FficeEventCategory.OPERATIONS;
        }
        if (containsMessageType(xml, "TRIAL_REQUEST") || containsMessageType(xml, "TRIAL_RESPONSE")) {
            return FficeEventCategory.TRIAL;
        }
        if (containsMessageType(xml, "SUBMISSION_RESPONSE")) {
            return FficeEventCategory.SUBMISSION;
        }
        if (containsMessageType(xml, "FLIGHT_DATA_REQUEST") || containsMessageType(xml, "FLIGHT_DATA_RESPONSE")) {
            return FficeEventCategory.DATA;
        }
        return FficeEventCategory.UNKNOWN;
    }

    public static String extractGufi(String xml) {
        int start = xml.indexOf("<globallyUniqueFlightIdentifier>");
        if (start == -1) {
            int nsStart = xml.indexOf(":globallyUniqueFlightIdentifier>");
            if (nsStart == -1) return UNKNOWN_VALUE;
            start = xml.indexOf(">", nsStart) + 1;
        } else {
            start += "<globallyUniqueFlightIdentifier>".length();
        }
        int end = xml.indexOf("</", start);
        if (end == -1) return UNKNOWN_VALUE;
        String value = xml.substring(start, end).trim();
        return value.isEmpty() ? UNKNOWN_VALUE : value;
    }

    private static boolean containsMessageType(String xml, String type) {
        return xml.contains(">" + type + "<") || xml.contains("\"" + type + "\"");
    }
}

FficeKafkaOutboxRouter.java

package com.github.swim_developer.extension.outbox.kafka.ffice;

import com.github.swim_developer.framework.infrastructure.out.messaging.AbstractOutboxRouter;
import io.micrometer.core.instrument.MeterRegistry;
import io.smallrye.reactive.messaging.kafka.Record;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@Slf4j
@ApplicationScoped
public class FficeKafkaOutboxRouter extends AbstractOutboxRouter {

    private final Emitter<Record<String, String>> flightPlanEmitter;
    private final Emitter<Record<String, String>> flightUpdateEmitter;
    private final Emitter<Record<String, String>> operationsEmitter;
    private final Emitter<Record<String, String>> trialEmitter;
    private final Emitter<Record<String, String>> submissionEmitter;
    private final Emitter<Record<String, String>> dataEmitter;
    private final Emitter<Record<String, String>> dlqEmitter;

    @Inject
    public FficeKafkaOutboxRouter(
            MeterRegistry meterRegistry,
            @Channel("out-flight-plan") Emitter<Record<String, String>> flightPlanEmitter,
            @Channel("out-flight-update") Emitter<Record<String, String>> flightUpdateEmitter,
            @Channel("out-operations") Emitter<Record<String, String>> operationsEmitter,
            @Channel("out-trial") Emitter<Record<String, String>> trialEmitter,
            @Channel("out-submission") Emitter<Record<String, String>> submissionEmitter,
            @Channel("out-data") Emitter<Record<String, String>> dataEmitter,
            @Channel("out-ffice-dlq") Emitter<Record<String, String>> dlqEmitter) {
        super(meterRegistry);
        this.flightPlanEmitter = flightPlanEmitter;
        this.flightUpdateEmitter = flightUpdateEmitter;
        this.operationsEmitter = operationsEmitter;
        this.trialEmitter = trialEmitter;
        this.submissionEmitter = submissionEmitter;
        this.dataEmitter = dataEmitter;
        this.dlqEmitter = dlqEmitter;
    }

    @Override
    public void route(String messageId, String payload) {
        FficeEventCategory category = FficeMessageClassifier.classify(payload);
        String gufi = FficeMessageClassifier.extractGufi(payload);

        incrementCounter("ffice_events_processed_total", "type", category.name(), "gufi", gufi);

        Record<String, String> kafkaRecord = Record.of(messageId, payload);
        String topicName = emit(category, kafkaRecord);

        log.debug("FF-ICE event sent to Kafka - MessageId: {}, Topic: {}, Category: {}, GUFI: {}",
                messageId, topicName, category, gufi);
    }

    private String emit(FficeEventCategory category, Record<String, String> kafkaRecord) {
        return switch (category) {
            case FLIGHT_PLAN -> {
                flightPlanEmitter.send(kafkaRecord);
                yield "ffice-flight-plan-topic";
            }
            case FLIGHT_UPDATE -> {
                flightUpdateEmitter.send(kafkaRecord);
                yield "ffice-flight-update-topic";
            }
            case OPERATIONS -> {
                operationsEmitter.send(kafkaRecord);
                yield "ffice-operations-topic";
            }
            case TRIAL -> {
                trialEmitter.send(kafkaRecord);
                yield "ffice-trial-topic";
            }
            case SUBMISSION -> {
                submissionEmitter.send(kafkaRecord);
                yield "ffice-submission-topic";
            }
            case DATA -> {
                dataEmitter.send(kafkaRecord);
                yield "ffice-data-topic";
            }
            default -> {
                dlqEmitter.send(kafkaRecord);
                yield "ffice-dlq-topic";
            }
        };
    }

    @Override
    protected Emitter<Record<String, String>> getDlqEmitter() {
        return dlqEmitter;
    }

    @Override
    protected String getServiceLabel() {
        return "FF-ICE";
    }
}

2.2 Register the module

Add the new module to the parent pom.xml of swim-developer-extensions:

<module>swim-outbox-kafka-ffice</module>

2.3 Build and install

From the swim-developer-extensions directory:

./mvnw clean install -DskipTests        # Linux / macOS
mvnw.cmd clean install -DskipTests      # Windows