Step 3 of 8
Part 2: Outbox Extension (swim-outbox-kafka-ffice)
The consumer needs an outbox router to classify processed events and send them to the correct Kafka topic. This is Extension Point EP3 (the outbox router SPI).
You cloned swim-developer-extensions during Setup. Navigate to that directory.
2.1 Create the module
Inside the swim-developer-extensions repository, create the directory structure:
swim-outbox-kafka-ffice/
pom.xml
src/main/java/com/github/swim_developer/extension/outbox/kafka/ffice/
FficeEventCategory.java
FficeMessageClassifier.java
FficeKafkaOutboxRouter.java
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.github.swim-developer</groupId>
<artifactId>swim-extensions</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>swim-outbox-kafka-ffice</artifactId>
<name>SWIM Extension - Outbox Kafka FF-ICE</name>
<description>SwimOutboxRouter implementation that dispatches FF-ICE outbox events to Kafka topics by message type.</description>
<dependencies>
<dependency>
<groupId>com.github.swim-developer</groupId>
<artifactId>swim-framework-core</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>io.smallrye</groupId>
<artifactId>jandex-maven-plugin</artifactId>
<version>3.5.3</version>
<executions>
<execution>
<id>make-index</id>
<goals><goal>jandex</goal></goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
FficeEventCategory.java
package com.github.swim_developer.extension.outbox.kafka.ffice;
public enum FficeEventCategory {
FLIGHT_PLAN,
FLIGHT_UPDATE,
OPERATIONS,
TRIAL,
SUBMISSION,
DATA,
UNKNOWN
}
FficeMessageClassifier.java
Classifies an XML payload by inspecting the <ffice:type> element value:
| XML type value | Category |
|---|---|
FILED_FLIGHT_PLAN, PRELIMINARY_FLIGHT_PLAN |
FLIGHT_PLAN |
FLIGHT_PLAN_UPDATE, PLANNING_STATUS, FILING_STATUS |
FLIGHT_UPDATE |
FLIGHT_DEPARTURE, FLIGHT_ARRIVAL, FLIGHT_CANCELLATION |
OPERATIONS |
TRIAL_REQUEST, TRIAL_RESPONSE |
TRIAL |
SUBMISSION_RESPONSE |
SUBMISSION |
FLIGHT_DATA_REQUEST, FLIGHT_DATA_RESPONSE |
DATA |
| anything else | UNKNOWN |
package com.github.swim_developer.extension.outbox.kafka.ffice;
public final class FficeMessageClassifier {
private static final String UNKNOWN_VALUE = "unknown";
private FficeMessageClassifier() {
}
public static FficeEventCategory classify(String xml) {
if (containsMessageType(xml, "FILED_FLIGHT_PLAN") || containsMessageType(xml, "PRELIMINARY_FLIGHT_PLAN")) {
return FficeEventCategory.FLIGHT_PLAN;
}
if (containsMessageType(xml, "FLIGHT_PLAN_UPDATE") || containsMessageType(xml, "PLANNING_STATUS")
|| containsMessageType(xml, "FILING_STATUS")) {
return FficeEventCategory.FLIGHT_UPDATE;
}
if (containsMessageType(xml, "FLIGHT_DEPARTURE") || containsMessageType(xml, "FLIGHT_ARRIVAL")
|| containsMessageType(xml, "FLIGHT_CANCELLATION")) {
return FficeEventCategory.OPERATIONS;
}
if (containsMessageType(xml, "TRIAL_REQUEST") || containsMessageType(xml, "TRIAL_RESPONSE")) {
return FficeEventCategory.TRIAL;
}
if (containsMessageType(xml, "SUBMISSION_RESPONSE")) {
return FficeEventCategory.SUBMISSION;
}
if (containsMessageType(xml, "FLIGHT_DATA_REQUEST") || containsMessageType(xml, "FLIGHT_DATA_RESPONSE")) {
return FficeEventCategory.DATA;
}
return FficeEventCategory.UNKNOWN;
}
public static String extractGufi(String xml) {
int start = xml.indexOf("<globallyUniqueFlightIdentifier>");
if (start == -1) {
int nsStart = xml.indexOf(":globallyUniqueFlightIdentifier>");
if (nsStart == -1) return UNKNOWN_VALUE;
start = xml.indexOf(">", nsStart) + 1;
} else {
start += "<globallyUniqueFlightIdentifier>".length();
}
int end = xml.indexOf("</", start);
if (end == -1) return UNKNOWN_VALUE;
String value = xml.substring(start, end).trim();
return value.isEmpty() ? UNKNOWN_VALUE : value;
}
private static boolean containsMessageType(String xml, String type) {
return xml.contains(">" + type + "<") || xml.contains("\"" + type + "\"");
}
}
FficeKafkaOutboxRouter.java
package com.github.swim_developer.extension.outbox.kafka.ffice;
import com.github.swim_developer.framework.infrastructure.out.messaging.AbstractOutboxRouter;
import io.micrometer.core.instrument.MeterRegistry;
import io.smallrye.reactive.messaging.kafka.Record;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@Slf4j
@ApplicationScoped
public class FficeKafkaOutboxRouter extends AbstractOutboxRouter {
private final Emitter<Record<String, String>> flightPlanEmitter;
private final Emitter<Record<String, String>> flightUpdateEmitter;
private final Emitter<Record<String, String>> operationsEmitter;
private final Emitter<Record<String, String>> trialEmitter;
private final Emitter<Record<String, String>> submissionEmitter;
private final Emitter<Record<String, String>> dataEmitter;
private final Emitter<Record<String, String>> dlqEmitter;
@Inject
public FficeKafkaOutboxRouter(
MeterRegistry meterRegistry,
@Channel("out-flight-plan") Emitter<Record<String, String>> flightPlanEmitter,
@Channel("out-flight-update") Emitter<Record<String, String>> flightUpdateEmitter,
@Channel("out-operations") Emitter<Record<String, String>> operationsEmitter,
@Channel("out-trial") Emitter<Record<String, String>> trialEmitter,
@Channel("out-submission") Emitter<Record<String, String>> submissionEmitter,
@Channel("out-data") Emitter<Record<String, String>> dataEmitter,
@Channel("out-ffice-dlq") Emitter<Record<String, String>> dlqEmitter) {
super(meterRegistry);
this.flightPlanEmitter = flightPlanEmitter;
this.flightUpdateEmitter = flightUpdateEmitter;
this.operationsEmitter = operationsEmitter;
this.trialEmitter = trialEmitter;
this.submissionEmitter = submissionEmitter;
this.dataEmitter = dataEmitter;
this.dlqEmitter = dlqEmitter;
}
@Override
public void route(String messageId, String payload) {
FficeEventCategory category = FficeMessageClassifier.classify(payload);
String gufi = FficeMessageClassifier.extractGufi(payload);
incrementCounter("ffice_events_processed_total", "type", category.name(), "gufi", gufi);
Record<String, String> kafkaRecord = Record.of(messageId, payload);
String topicName = emit(category, kafkaRecord);
log.debug("FF-ICE event sent to Kafka - MessageId: {}, Topic: {}, Category: {}, GUFI: {}",
messageId, topicName, category, gufi);
}
private String emit(FficeEventCategory category, Record<String, String> kafkaRecord) {
return switch (category) {
case FLIGHT_PLAN -> {
flightPlanEmitter.send(kafkaRecord);
yield "ffice-flight-plan-topic";
}
case FLIGHT_UPDATE -> {
flightUpdateEmitter.send(kafkaRecord);
yield "ffice-flight-update-topic";
}
case OPERATIONS -> {
operationsEmitter.send(kafkaRecord);
yield "ffice-operations-topic";
}
case TRIAL -> {
trialEmitter.send(kafkaRecord);
yield "ffice-trial-topic";
}
case SUBMISSION -> {
submissionEmitter.send(kafkaRecord);
yield "ffice-submission-topic";
}
case DATA -> {
dataEmitter.send(kafkaRecord);
yield "ffice-data-topic";
}
default -> {
dlqEmitter.send(kafkaRecord);
yield "ffice-dlq-topic";
}
};
}
@Override
protected Emitter<Record<String, String>> getDlqEmitter() {
return dlqEmitter;
}
@Override
protected String getServiceLabel() {
return "FF-ICE";
}
}
2.2 Register the module
Add the new module to the parent pom.xml of swim-developer-extensions:
<module>swim-outbox-kafka-ffice</module>
2.3 Build and install
From the swim-developer-extensions directory:
./mvnw clean install -DskipTests # Linux / macOS
mvnw.cmd clean install -DskipTests # Windows