Part 3: Consumer Service (swim-ffice-consumer)
3.1 Generate the project
Choose a working directory for your new project and run:
mvn archetype:generate \
-DarchetypeGroupId=com.github.swim-developer \
-DarchetypeArtifactId=swim-consumer-archetype \
-DarchetypeVersion=1.0.0-SNAPSHOT \
-DgroupId=com.github.swim_developer \
-DartifactId=swim-ffice-consumer \
-Dversion=1.0.0-SNAPSHOT \
-DserviceName=ffice \
-DserviceDisplayName="FF-ICE" \
-DservicePrefix=Ffice \
-DdataModel=FIXM \
-DcollectionPrefix=ffice \
-DmodelArtifactId=swim-fixm-ffice-model \
-DinteractiveMode=false
Enter the generated project:
cd swim-ffice-consumer
chmod +x mvnw # Linux / macOS only
On Windows, use mvnw.cmd instead of ./mvnw in all subsequent commands.
The archetype generates 38 Java classes. 10 require domain-specific implementation. The remaining 28 work out of the box.
3.2 Add dependencies to pom.xml
Add these dependencies:
<dependency>
<groupId>com.github.swim-developer</groupId>
<artifactId>swim-fixm-ffice-model</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.swim-developer</groupId>
<artifactId>swim-outbox-kafka-ffice</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.swim-developer</groupId>
<artifactId>swim-inbox-store-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.swim-developer</groupId>
<artifactId>swim-inbox-reader-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.swim-developer</groupId>
<artifactId>swim-framework-persistence-mongodb</artifactId>
<version>${project.version}</version>
</dependency>
The Jandex index entries for these modules (so Quarkus discovers their beans) are added in Step 3.3 together with the Kafka channel configuration.
3.3 Configure Kafka serializers
In application.properties, add the following outgoing Kafka channels (connectors, topics, serializers, compression, and acks) for default event routing and FF-ICE domain outbox routing:
mp.messaging.outgoing.out-events.connector=smallrye-kafka
mp.messaging.outgoing.out-events.topic=ffice-events-topic
mp.messaging.outgoing.out-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-events.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-events.compression.type=lz4
mp.messaging.outgoing.out-events.acks=1
mp.messaging.outgoing.out-events.max-inflight-messages=${KAFKA_MAX_INFLIGHT:0}
mp.messaging.outgoing.out-events.buffer-size=${KAFKA_BUFFER_SIZE:2048}
mp.messaging.outgoing.out-events.waitForWriteCompletion=false
mp.messaging.outgoing.out-events.tracing-enabled=true
mp.messaging.outgoing.out-dlq.connector=smallrye-kafka
mp.messaging.outgoing.out-dlq.topic=ffice-events-dlq-topic
mp.messaging.outgoing.out-dlq.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-dlq.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-dlq.compression.type=lz4
mp.messaging.outgoing.out-dlq.acks=1
mp.messaging.outgoing.out-dlq.max-inflight-messages=${KAFKA_MAX_INFLIGHT:0}
mp.messaging.outgoing.out-dlq.buffer-size=${KAFKA_BUFFER_SIZE:2048}
mp.messaging.outgoing.out-dlq.waitForWriteCompletion=false
mp.messaging.outgoing.out-dlq.tracing-enabled=true
mp.messaging.outgoing.out-flight-plan.connector=smallrye-kafka
mp.messaging.outgoing.out-flight-plan.topic=ffice-flight-plan-topic
mp.messaging.outgoing.out-flight-plan.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-flight-plan.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-flight-plan.compression.type=lz4
mp.messaging.outgoing.out-flight-plan.acks=1
mp.messaging.outgoing.out-flight-update.connector=smallrye-kafka
mp.messaging.outgoing.out-flight-update.topic=ffice-flight-update-topic
mp.messaging.outgoing.out-flight-update.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-flight-update.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-flight-update.compression.type=lz4
mp.messaging.outgoing.out-flight-update.acks=1
mp.messaging.outgoing.out-operations.connector=smallrye-kafka
mp.messaging.outgoing.out-operations.topic=ffice-operations-topic
mp.messaging.outgoing.out-operations.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-operations.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-operations.compression.type=lz4
mp.messaging.outgoing.out-operations.acks=1
mp.messaging.outgoing.out-trial.connector=smallrye-kafka
mp.messaging.outgoing.out-trial.topic=ffice-trial-topic
mp.messaging.outgoing.out-trial.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-trial.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-trial.compression.type=lz4
mp.messaging.outgoing.out-trial.acks=1
mp.messaging.outgoing.out-submission.connector=smallrye-kafka
mp.messaging.outgoing.out-submission.topic=ffice-submission-topic
mp.messaging.outgoing.out-submission.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-submission.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-submission.compression.type=lz4
mp.messaging.outgoing.out-submission.acks=1
mp.messaging.outgoing.out-data.connector=smallrye-kafka
mp.messaging.outgoing.out-data.topic=ffice-data-topic
mp.messaging.outgoing.out-data.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-data.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-data.compression.type=lz4
mp.messaging.outgoing.out-data.acks=1
mp.messaging.outgoing.out-ffice-dlq.connector=smallrye-kafka
mp.messaging.outgoing.out-ffice-dlq.topic=ffice-dlq-topic
mp.messaging.outgoing.out-ffice-dlq.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-ffice-dlq.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-ffice-dlq.compression.type=lz4
mp.messaging.outgoing.out-ffice-dlq.acks=1
Add Jandex index entries for external modules (model, outbox extension, inbox store, inbox reader):
# =============================================================================
# Jandex Index - External Modules
# =============================================================================
quarkus.index-dependency.swim-fixm-ffice-model.group-id=com.github.swim-developer
quarkus.index-dependency.swim-fixm-ffice-model.artifact-id=swim-fixm-ffice-model
quarkus.index-dependency.swim-outbox-kafka-ffice.group-id=com.github.swim-developer
quarkus.index-dependency.swim-outbox-kafka-ffice.artifact-id=swim-outbox-kafka-ffice
quarkus.index-dependency.swim-inbox-store-kafka.group-id=com.github.swim-developer
quarkus.index-dependency.swim-inbox-store-kafka.artifact-id=swim-inbox-store-kafka
quarkus.index-dependency.swim-inbox-reader-kafka.group-id=com.github.swim-developer
quarkus.index-dependency.swim-inbox-reader-kafka.artifact-id=swim-inbox-reader-kafka
3.4 Register the Caffeine cache
The framework uses Quarkus Caffeine cache for idempotency. The cache declared in application.properties is only initialized at build time if referenced by a @CacheName annotation. Create a simple registration bean:
package com.github.swim_developer.infrastructure;
import io.quarkus.cache.CacheName;
import io.quarkus.cache.Cache;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class CacheRegistration {
@CacheName("processed-messages")
Cache processedMessagesCache;
}
3.5 Implement the 10 domain classes
These are the classes that contain // TODO markers. Replace each one with the full implementation below.
1. EventExtractor.java
infrastructure/out/xml/EventExtractor.java - Extracts FF-ICE domain metadata from the JAXB object graph into the Event entity.
package com.github.swim_developer.infrastructure.out.xml;
import aero.fixm.base.AerodromeReferenceType;
import aero.fixm.base.GloballyUniqueFlightIdentifierType;
import aero.fixm.ffice.FficeMessageType;
import aero.fixm.ffice.MessageTypeType;
import aero.fixm.flight.ArrivalType;
import aero.fixm.flight.DepartureType;
import aero.fixm.flight.FlightIdentificationType;
import aero.fixm.flight.FlightType;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.framework.application.port.out.SwimEventExtractor;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.xml.bind.JAXBElement;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Optional;
@Slf4j
@ApplicationScoped
public class EventExtractor implements SwimEventExtractor<Event, FficeMessageType> {
@Override
public String getTypeLabel(Event event) {
return event.getFficeMessageType() != null ? event.getFficeMessageType() : "unknown";
}
@Override
public List<Optional<Event>> extract(FficeMessageType fficeMessage) {
if (fficeMessage == null) {
return List.of(Optional.empty());
}
Event event = new Event();
MessageTypeType msgType = fficeMessage.getType();
if (msgType != null) {
event.setFficeMessageType(msgType.name());
}
if (fficeMessage.getTimestamp() != null) {
event.setMessageTimestamp(fficeMessage.getTimestamp().toString());
}
if (fficeMessage.getUniqueMessageIdentifier() != null) {
event.setUniqueMessageIdentifier(fficeMessage.getUniqueMessageIdentifier().getValue());
}
FlightType flight = fficeMessage.getFlight();
if (flight != null) {
extractFlightData(event, flight);
}
return List.of(Optional.of(event));
}
private void extractFlightData(Event event, FlightType flight) {
FlightIdentificationType flightId = unwrap(flight.getFlightIdentification());
if (flightId != null) {
GloballyUniqueFlightIdentifierType gufi = unwrap(flightId.getGufi());
if (gufi != null) {
event.setGufi(gufi.getValue());
}
String acId = unwrap(flightId.getAircraftIdentification());
if (acId != null) {
event.setAircraftIdentification(acId);
}
}
DepartureType departure = unwrap(flight.getDeparture());
if (departure != null) {
event.setDepartureAerodrome(extractLocationIndicator(departure.getDepartureAerodrome()));
}
ArrivalType arrival = unwrap(flight.getArrival());
if (arrival != null) {
String dest = extractLocationIndicator(arrival.getDestinationAerodrome());
if (dest == null) {
dest = extractLocationIndicator(arrival.getArrivalAerodrome());
}
event.setArrivalAerodrome(dest);
}
}
private String extractLocationIndicator(JAXBElement<AerodromeReferenceType> element) {
AerodromeReferenceType ref = unwrap(element);
if (ref == null) {
return null;
}
return unwrap(ref.getLocationIndicator());
}
private static <T> T unwrap(JAXBElement<T> element) {
return (element != null && element.getValue() != null) ? element.getValue() : null;
}
}
2. JaxbUnmarshallerPool.java
infrastructure/out/xml/JaxbUnmarshallerPool.java - Adapts the model's FficeUnmarshallerPool to the framework's SwimXmlUnmarshallerPort.
package com.github.swim_developer.infrastructure.out.xml;
import aero.fixm.ffice.FficeMessageType;
import aero.fixm.ffice.validation.FficeUnmarshallerPool;
import com.github.swim_developer.framework.application.port.out.SwimXmlUnmarshallerPort;
import com.github.swim_developer.framework.domain.exception.XmlValidationException;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ApplicationScoped
public class JaxbUnmarshallerPool implements SwimXmlUnmarshallerPort<FficeMessageType> {
private FficeUnmarshallerPool pool;
@PostConstruct
void init() {
this.pool = new FficeUnmarshallerPool();
log.info("FF-ICE JAXB unmarshaller pool initialized");
}
@Override
public FficeMessageType unmarshalAndValidate(String xml) throws XmlValidationException {
try {
Object result = pool.unmarshalAndValidate(xml);
if (result instanceof FficeMessageType fficeMessage) {
return fficeMessage;
}
throw new XmlValidationException("Unexpected JAXB root type: " + result.getClass().getName());
} catch (FficeUnmarshallerPool.FficeUnmarshalException e) {
throw new XmlValidationException(e.getMessage(), e);
}
}
}
3. XmlEnvelopeParser.java
infrastructure/out/xml/XmlEnvelopeParser.java - Each FF-ICE AMQP message contains a single FficeMessage, so the implementation returns the input as-is.
package com.github.swim_developer.infrastructure.out.xml;
import jakarta.enterprise.context.ApplicationScoped;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
@ApplicationScoped
public class XmlEnvelopeParser {
// TODO: Implement envelope splitting logic for your XML format
// DNOTAM uses AIXMBasicMessage which may contain multiple members
// ED-254 passes through single messages
// Your service should split according to its data model
public List<String> splitEnvelope(String rawPayload) {
return List.of(rawPayload);
}
}
4. EventDataValidator.java
application/service/EventDataValidator.java - Validates the extracted event data. Warns if fficeMessageType is missing.
package com.github.swim_developer.application.service;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.framework.application.model.ProcessingContext;
import com.github.swim_developer.framework.consumer.application.messaging.processing.SwimEventValidator;
import jakarta.enterprise.context.ApplicationScoped;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ApplicationScoped
public class EventDataValidator implements SwimEventValidator<Event> {
@Override
public void validateExtractedData(ProcessingContext ctx, Event event) {
if (event.getFficeMessageType() == null || event.getFficeMessageType().isBlank()) {
log.warn("FF-ICE message type is missing - MessageId: {}", ctx.compositeMessageId());
}
}
}
5. EventFilterService.java
application/service/EventFilterService.java - Returns an empty list of filter rules (no domain-specific filtering initially).
package com.github.swim_developer.application.service;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.framework.application.port.out.SwimDeadLetterPort;
import com.github.swim_developer.framework.application.port.out.SwimSubscriptionFilterPort;
import com.github.swim_developer.framework.consumer.application.messaging.processing.AbstractEventFilterService;
import com.github.swim_developer.framework.consumer.application.messaging.processing.FilterRule;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.List;
@ApplicationScoped
public class EventFilterService extends AbstractEventFilterService<Event> {
@Inject
public EventFilterService(SwimSubscriptionFilterPort filterCache,
SwimDeadLetterPort deadLetterService) {
super(filterCache, deadLetterService);
}
@Override
protected List<FilterRule<Event>> buildFilterRules(Event event) {
return List.of();
}
}
6. EventPersistenceService.java
application/service/EventPersistenceService.java - Populates the event entity with audit fields and delegates persistence.
package com.github.swim_developer.application.service;
import com.github.swim_developer.application.port.out.EventStore;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.framework.application.model.OutboxDeliveryStatus;
import com.github.swim_developer.framework.application.model.ProcessingContext;
import com.github.swim_developer.framework.application.port.out.SwimDeadLetterPort;
import com.github.swim_developer.framework.consumer.application.messaging.outbox.OutboxRouterFanOut;
import com.github.swim_developer.framework.consumer.application.messaging.processing.AbstractEventPersistenceService;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.Instant;
import java.util.List;
@ApplicationScoped
public class EventPersistenceService extends AbstractEventPersistenceService<Event, Event> {
private final EventStore repository;
@Inject
public EventPersistenceService(EventStore repository,
OutboxRouterFanOut outboxRouterFanOut,
SwimDeadLetterPort deadLetterService) {
super(outboxRouterFanOut, deadLetterService);
this.repository = repository;
}
@Override
protected Event assembleEntity(ProcessingContext ctx, Event event, String contentHash) {
event.setSubscriptionId(ctx.subscriptionId());
event.setMessageId(ctx.amqpMessageId());
event.setRawPayload(ctx.xmlPayload());
event.setContentHash(contentHash);
event.setDeliveryStatus(OutboxDeliveryStatus.SENT);
event.setDispatchedAt(Instant.now());
return event;
}
@Override
protected void persistEntity(Event entity) { repository.persist(entity); }
@Override
protected void persistEntities(List<Event> entities) { repository.persist(entities); }
@Override
protected void updateEntity(Event entity) { repository.update(entity); }
@Override
protected String getServicePrefix() { return "FF-ICE"; }
}
7. ProcessorCallbacks.java
application/service/ProcessorCallbacks.java - Lifecycle hooks for event processing: paused subscription check, duplicate detection, error logging.
package com.github.swim_developer.application.service;
import com.github.swim_developer.application.port.out.SubscriptionStore;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.domain.model.Subscription;
import com.github.swim_developer.framework.application.model.ProcessingContext;
import com.github.swim_developer.framework.consumer.application.messaging.processing.SwimEventProcessorCallbacks;
import com.github.swim_developer.framework.consumer.application.messaging.processing.SwimEventProcessorConfig;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
@Slf4j
@ApplicationScoped
public class ProcessorCallbacks implements SwimEventProcessorCallbacks<Event> {
private final ProcessingMetrics metrics;
private final SubscriptionStore subscriptionStore;
@Inject
public ProcessorCallbacks(ProcessingMetrics metrics, SubscriptionStore subscriptionStore) {
this.metrics = metrics;
this.subscriptionStore = subscriptionStore;
}
@Override
public boolean preProcess(ProcessingContext ctx) {
Optional<Subscription> sub = subscriptionStore.findBySubscriptionId(ctx.subscriptionId());
if (sub.isPresent() && "PAUSED".equals(sub.get().getSubscriptionStatus())) {
log.warn("PAUSED_SUBSCRIPTION_DISCARD: SubscriptionId={}, MessageId={}",
ctx.subscriptionId(), ctx.amqpMessageId());
return true;
}
return false;
}
@Override
public void onDuplicateDetected(ProcessingContext ctx, String contentHash) {
metrics.incrementDuplicate();
}
@Override
public void onExtractionFailure(ProcessingContext ctx, SwimEventProcessorConfig config) {
metrics.incrementInvalid("INVALID");
log.error("Invalid FF-ICE message - MessageId: {}", ctx.compositeMessageId());
}
@Override
public void onValidationFailure(ProcessingContext ctx, Exception e) {
log.error("Problematic XML (first 500 chars): {}",
ctx.xmlPayload().length() > 500 ? ctx.xmlPayload().substring(0, 500) : ctx.xmlPayload());
}
}
8. FficeSubscriptionRenewalStrategy.java
infrastructure/out/subscription/FficeSubscriptionRenewalStrategy.java - Finds subscriptions near expiry and renews them via the Subscription Manager REST API. The archetype generates this file with the correct Ffice prefix.
package com.github.swim_developer.infrastructure.out.subscription;
import com.github.swim_developer.domain.model.Subscription;
import com.github.swim_developer.application.port.out.SubscriptionStore;
import com.github.swim_developer.infrastructure.out.client.SubscriptionManagerAdapter;
import com.github.swim_developer.infrastructure.out.client.SubscriptionManagerRestClient;
import com.github.swim_developer.infrastructure.in.rest.dto.SubscriptionResponse;
import com.github.swim_developer.framework.consumer.infrastructure.out.config.provider.ProviderConfigParser;
import com.github.swim_developer.framework.application.model.ProviderConfiguration;
import com.github.swim_developer.framework.domain.model.SubscriptionRenewalInfo;
import com.github.swim_developer.framework.domain.exception.SubscriptionRenewalException;
import com.github.swim_developer.framework.domain.model.SubscriptionStatus;
import com.github.swim_developer.framework.application.port.out.SubscriptionRenewalStrategy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import java.time.Instant;
import java.util.List;
@Slf4j
@ApplicationScoped
public class FficeSubscriptionRenewalStrategy implements SubscriptionRenewalStrategy {
private final SubscriptionStore subscriptionStore;
private final SubscriptionManagerAdapter smClientRegistry;
private final ProviderConfigParser providerConfigParser;
@Inject
public FficeSubscriptionRenewalStrategy(SubscriptionStore subscriptionStore,
SubscriptionManagerAdapter smClientRegistry,
ProviderConfigParser providerConfigParser) {
this.subscriptionStore = subscriptionStore;
this.smClientRegistry = smClientRegistry;
this.providerConfigParser = providerConfigParser;
}
@Override
public List<SubscriptionRenewalInfo> findSubscriptionsNearExpiry(Instant threshold) {
return subscriptionStore.findBySubscriptionEndBefore(threshold)
.stream()
.filter(sub -> SubscriptionStatus.ACTIVE.name().equals(sub.getSubscriptionStatus()))
.map(sub -> new SubscriptionRenewalInfo(sub.getSubscriptionId(), sub.getSubscriptionEnd()))
.toList();
}
@Override
public void renewSubscription(String subscriptionId) throws SubscriptionRenewalException {
log.info("Renewing subscription: {}", subscriptionId);
Subscription subscription = subscriptionStore.findBySubscriptionId(subscriptionId)
.orElseThrow(() -> new IllegalStateException("Subscription not found: " + subscriptionId));
SubscriptionManagerRestClient client = resolveSmClient(subscription.getProviderId());
SubscriptionResponse response = client.renewSubscription(subscriptionId);
subscription.setSubscriptionEnd(response.subscriptionEnd());
subscriptionStore.updateSubscription(subscription);
log.info("Subscription renewed - ID: {}, New end: {}", subscriptionId, response.subscriptionEnd());
}
private SubscriptionManagerRestClient resolveSmClient(String providerId) {
ProviderConfiguration provider = providerConfigParser.findByProviderId(providerId)
.orElseThrow(() -> new IllegalStateException("Provider not configured: " + providerId));
return smClientRegistry.getOrCreate(provider);
}
}
9. InboxMessageHandler.java
infrastructure/in/amqp/InboxMessageHandler.java - Reads from the Kafka inbox topic, processes batches of FF-ICE events.
package com.github.swim_developer.infrastructure.in.amqp;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.swim_developer.application.usecase.EventProcessingUseCase;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.infrastructure.out.xml.XmlEnvelopeParser;
import com.github.swim_developer.extension.inbox.reader.kafka.AbstractKafkaInboxReader;
import com.github.swim_developer.framework.application.model.PreparedEvent;
import com.github.swim_developer.framework.application.model.ProcessingOutcome;
import com.github.swim_developer.framework.infrastructure.out.messaging.InboxEnvelope;
import io.micrometer.core.instrument.MeterRegistry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import java.util.List;
import java.util.concurrent.CompletionStage;
@Slf4j
@ApplicationScoped
public class InboxMessageHandler extends AbstractKafkaInboxReader {
private final EventProcessingUseCase eventProcessor;
private final XmlEnvelopeParser envelopeParser;
protected InboxMessageHandler() {
this(null, null, null, null);
}
@Inject
public InboxMessageHandler(ObjectMapper objectMapper,
MeterRegistry meterRegistry,
EventProcessingUseCase eventProcessor,
XmlEnvelopeParser envelopeParser) {
super(objectMapper, meterRegistry);
this.eventProcessor = eventProcessor;
this.envelopeParser = envelopeParser;
}
// TODO: Update channel name to match your Kafka inbox topic config
@Incoming("in-ffice-inbox")
@Blocking
public CompletionStage<Void> onInboxBatch(KafkaRecordBatch<String, String> batch) {
List<PreparedEvent<Event>> prepared = prepareBatch(batch, eventProcessor.eventProcessingOrchestrator());
if (!prepared.isEmpty()) {
eventProcessor.batchPersistAndDispatch(prepared);
eventProcessor.markBatchAsProcessed(prepared);
}
processedCounter.increment(prepared.size());
return batch.ack();
}
@Override
public List<String> extractMessages(String rawPayload) {
return envelopeParser.splitEnvelope(rawPayload);
}
@WithSpan("ffice.consumer.event.process")
@Override
public void processSingleMessage(InboxEnvelope envelope, String xmlPayload, int index) {
Span.current().setAttribute("ffice.subscription", envelope.subscriptionId());
Span.current().setAttribute("ffice.queue", envelope.queueName());
ProcessingOutcome outcome = eventProcessor.processAndPersistSingleMessage(
envelope.subscriptionId(),
envelope.queueName(),
envelope.amqpMessageId(),
xmlPayload,
index);
Span.current().setAttribute("ffice.outcome", outcome.name());
}
@Override
public String getMetricPrefix() {
return "ffice";
}
}
10. OutboxMessageHandler.java
infrastructure/out/messaging/OutboxMessageHandler.java - Processes outbox events with fault tolerance annotations.
package com.github.swim_developer.infrastructure.out.messaging;
import com.github.swim_developer.framework.consumer.application.messaging.outbox.AbstractOutboxEventConsumer;
import com.github.swim_developer.framework.consumer.application.messaging.outbox.OutboxRouterFanOut;
import com.github.swim_developer.framework.consumer.application.port.out.SwimOutboxRetryPort;
import com.github.swim_developer.framework.domain.model.SwimOutboxEvent;
import com.github.swim_developer.framework.infrastructure.out.cache.HandoffCache;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.infrastructure.out.persistence.MongoEventStore;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.common.annotation.Blocking;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.faulttolerance.Timeout;
import java.util.Optional;
@Slf4j
@ApplicationScoped
public class OutboxMessageHandler extends AbstractOutboxEventConsumer<Event> implements SwimOutboxRetryPort {
public static final String OUTBOX_EVENT_ADDRESS = "outbox.pending";
private final MongoEventStore eventRepository;
private final OutboxRouterFanOut outboxRouterFanOut;
private final HandoffCache handoffCache;
@Inject
public OutboxMessageHandler(MongoEventStore eventRepository,
OutboxRouterFanOut outboxRouterFanOut,
HandoffCache handoffCache,
@ConfigProperty(name = "swim.outbox.kafka.max-retries", defaultValue = "3") int maxKafkaRetries) {
super(maxKafkaRetries);
this.eventRepository = eventRepository;
this.outboxRouterFanOut = outboxRouterFanOut;
this.handoffCache = handoffCache;
}
@Override
@ConsumeEvent(OUTBOX_EVENT_ADDRESS)
@Blocking
@Timeout(10000)
@Retry(maxRetries = 3, delay = 1000)
@Bulkhead(250)
@WithSpan("ffice.consumer.outbox.kafka")
public void processOutboxEvent(String eventId) {
super.processOutboxEvent(eventId);
}
@Override
public void retryOutboxEvent(SwimOutboxEvent event) {
sendAndUpdateStatus((Event) event);
}
@Override
protected Event resolveEvent(String eventIdStr) {
Optional<Event> cached = handoffCache.getAndRemove(eventIdStr, Event.class);
if (cached.isPresent()) {
return cached.get();
}
return eventRepository.findEventById(eventIdStr);
}
@Override
protected OutboxRouterFanOut getRouterFanOut() { return outboxRouterFanOut; }
@Override
protected String getEventId(Event event) {
return event.getId() != null ? event.getId().toHexString() : null;
}
@Override
protected void updateEvent(Event event) { eventRepository.persistOrUpdate(event); }
public static String getEventAddress() { return OUTBOX_EVENT_ADDRESS; }
}
Create src/main/java/com/github/swim_developer/infrastructure/CacheRegistration.java:
package com.github.swim_developer.infrastructure;
import io.quarkus.cache.CacheName;
import io.quarkus.cache.Cache;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class CacheRegistration {
@CacheName("processed-messages")
Cache processedMessagesCache;
}
3.6 Replace Object with FficeMessageType in EventProcessingUseCase
The archetype generates EventProcessingUseCase.java with Object as a generic placeholder for the JAXB root type. You must replace it with the actual FF-ICE root type.
Open src/main/java/com/github/swim_developer/application/usecase/EventProcessingUseCase.java and make 4 replacements:
- Add the import at the top of the file (after the other imports):
import aero.fixm.ffice.FficeMessageType; - Replace all 4 occurrences of
ObjectwithFficeMessageType:
| Line (approx.) | Before | After |
|---|---|---|
| Field declaration | EventProcessingOrchestrator<Event, Object> |
EventProcessingOrchestrator<Event, FficeMessageType> |
| Constructor parameter | SwimXmlUnmarshallerPort<Object> jaxbPool |
SwimXmlUnmarshallerPort<FficeMessageType> jaxbPool |
| Parser reference | SwimEventParser<Object> parser |
SwimEventParser<FficeMessageType> parser |
| Getter return type | EventProcessingOrchestrator<Event, Object> |
EventProcessingOrchestrator<Event, FficeMessageType> |
- Delete the
// TODOcomment on the line above the field declaration.
After the edits, the file should look like this:
package com.github.swim_developer.application.usecase;
import com.github.swim_developer.application.service.ProcessingMetrics;
import com.github.swim_developer.application.port.out.SubscriptionStore;
import com.github.swim_developer.application.service.EventDataValidator;
import com.github.swim_developer.application.service.EventFilterService;
import com.github.swim_developer.application.service.EventPersistenceService;
import com.github.swim_developer.application.service.ProcessorCallbacks;
import com.github.swim_developer.domain.model.Event;
import aero.fixm.ffice.FficeMessageType;
import com.github.swim_developer.infrastructure.out.xml.EventExtractor;
import com.github.swim_developer.framework.consumer.application.messaging.processing.DefaultEventProcessorConfig;
import com.github.swim_developer.framework.application.model.PreparedEvent;
import com.github.swim_developer.framework.application.model.ProcessingContext;
import com.github.swim_developer.framework.application.model.ProcessingOutcome;
import com.github.swim_developer.framework.consumer.application.messaging.processing.EventProcessingOrchestrator;
import com.github.swim_developer.framework.consumer.application.messaging.processing.EventProcessingOrchestratorDependencies;
import com.github.swim_developer.framework.consumer.application.messaging.processing.SwimEventParser;
import com.github.swim_developer.framework.consumer.application.messaging.processing.SwimEventProcessorCallbacks;
import com.github.swim_developer.framework.application.port.in.SwimMessageInterceptor;
import com.github.swim_developer.framework.application.port.out.SwimXmlUnmarshallerPort;
import io.micrometer.core.instrument.MeterRegistry;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import java.util.List;
@ApplicationScoped
public class EventProcessingUseCase {
private final EventProcessingOrchestrator<Event, FficeMessageType> orchestrator;
private final EventPersistenceService persistenceService;
@Inject
public EventProcessingUseCase(
DefaultEventProcessorConfig processorConfig,
SwimXmlUnmarshallerPort<FficeMessageType> jaxbPool,
EventExtractor eventExtractor,
EventDataValidator validator,
EventFilterService filterService,
EventPersistenceService persistenceService,
ProcessingMetrics metrics,
MeterRegistry meterRegistry,
SubscriptionStore subscriptionStore,
@Any Instance<SwimMessageInterceptor> interceptorInstances) {
this.persistenceService = persistenceService;
SwimEventParser<FficeMessageType> parser = jaxbPool::unmarshalAndValidate;
SwimEventProcessorCallbacks<Event> callbacks = new ProcessorCallbacks(metrics, subscriptionStore);
this.orchestrator = new EventProcessingOrchestrator<>(new EventProcessingOrchestratorDependencies<>(
processorConfig, parser, eventExtractor, validator, filterService,
persistenceService, callbacks, meterRegistry, interceptorInstances));
}
public ProcessingOutcome processAndPersistSingleMessage(String subscriptionId, String queueName,
String amqpMessageId, String xml, int index) {
return orchestrator.processMessage(new ProcessingContext(subscriptionId, queueName, amqpMessageId, xml, index, null));
}
public EventProcessingOrchestrator<Event, FficeMessageType> eventProcessingOrchestrator() {
return orchestrator;
}
public void batchPersistAndDispatch(List<PreparedEvent<Event>> batch) {
persistenceService.batchPersistAndDispatch(batch);
}
public void markBatchAsProcessed(List<PreparedEvent<Event>> batch) {
orchestrator.markBatchAsProcessed(batch);
}
}
3.7 Add domain fields to Event
Add FF-ICE-specific fields to the Event entity class:
private String fficeMessageType;
private String gufi;
private String aircraftIdentification;
private String departureAerodrome;
private String arrivalAerodrome;
private String messageTimestamp;
private String uniqueMessageIdentifier;
3.8 Update EventDTO and SubscriptionMapper
The archetype generates a generic EventDTO with a deliveryStatus field. Replace it with the FF-ICE-specific fields that match the domain model.
Replace src/main/java/com/github/swim_developer/infrastructure/in/rest/dto/EventDTO.java with:
package com.github.swim_developer.infrastructure.in.rest.dto;
import java.time.Instant;
public record EventDTO(
String id,
String messageId,
String subscriptionId,
Instant receivedAt,
String fficeMessageType,
String gufi,
String aircraftIdentification,
String departureAerodrome,
String arrivalAerodrome
) {}
Update src/main/java/com/github/swim_developer/infrastructure/out/mapper/SubscriptionMapper.java to map the FF-ICE fields instead of deliveryStatus:
package com.github.swim_developer.infrastructure.out.mapper;
import com.github.swim_developer.framework.consumer.infrastructure.out.dlq.DeadLetterMessage;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.infrastructure.in.rest.dto.EventDTO;
import com.github.swim_developer.framework.infrastructure.out.messaging.DlqMessageDTO;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class SubscriptionMapper {
public EventDTO toDTO(Event event) {
return new EventDTO(
event.getId() != null ? event.getId().toHexString() : null,
event.getMessageId(),
event.getSubscriptionId(),
event.getReceivedAt(),
event.getFficeMessageType(),
event.getGufi(),
event.getAircraftIdentification(),
event.getDepartureAerodrome(),
event.getArrivalAerodrome()
);
}
public DlqMessageDTO toDTO(DeadLetterMessage dlq) {
return new DlqMessageDTO(
dlq.getId(),
dlq.getAmqpMessageId(),
dlq.getMessageIndex(),
dlq.getSubscriptionId(),
dlq.getQueueName(),
dlq.getErrorType(),
dlq.getErrorMessage(),
dlq.getRawPayload(),
dlq.getReceivedAt(),
dlq.getFailedAt()
);
}
}
3.9 Verify compilation
./mvnw clean package -DskipTests # Linux / macOS
mvnw.cmd clean package -DskipTests # Windows