Step 4 of 8

Part 3: Consumer Service (swim-ffice-consumer)

3.1 Generate the project

Choose a working directory for your new project and run:

mvn archetype:generate \
  -DarchetypeGroupId=com.github.swim-developer \
  -DarchetypeArtifactId=swim-consumer-archetype \
  -DarchetypeVersion=1.0.0-SNAPSHOT \
  -DgroupId=com.github.swim_developer \
  -DartifactId=swim-ffice-consumer \
  -Dversion=1.0.0-SNAPSHOT \
  -DserviceName=ffice \
  -DserviceDisplayName="FF-ICE" \
  -DservicePrefix=Ffice \
  -DdataModel=FIXM \
  -DcollectionPrefix=ffice \
  -DmodelArtifactId=swim-fixm-ffice-model \
  -DinteractiveMode=false

Enter the generated project:

cd swim-ffice-consumer
chmod +x mvnw                            # Linux / macOS only

On Windows, use mvnw.cmd instead of ./mvnw in all subsequent commands.

The archetype generates 38 Java classes. 10 require domain-specific implementation. The remaining 28 work out of the box.

3.2 Add dependencies to pom.xml

Add these dependencies:

<dependency>
    <groupId>com.github.swim-developer</groupId>
    <artifactId>swim-fixm-ffice-model</artifactId>
    <version>${project.version}</version>
</dependency>
<dependency>
    <groupId>com.github.swim-developer</groupId>
    <artifactId>swim-outbox-kafka-ffice</artifactId>
    <version>${project.version}</version>
</dependency>
<dependency>
    <groupId>com.github.swim-developer</groupId>
    <artifactId>swim-inbox-store-kafka</artifactId>
    <version>${project.version}</version>
</dependency>
<dependency>
    <groupId>com.github.swim-developer</groupId>
    <artifactId>swim-inbox-reader-kafka</artifactId>
    <version>${project.version}</version>
</dependency>
<dependency>
    <groupId>com.github.swim-developer</groupId>
    <artifactId>swim-framework-persistence-mongodb</artifactId>
    <version>${project.version}</version>
</dependency>

The Jandex index entries for these modules (so Quarkus discovers their beans) are added in Step 3.3 together with the Kafka channel configuration.

3.3 Configure Kafka serializers

In application.properties, add the following outgoing Kafka channels (connectors, topics, serializers, compression, and acks) for default event routing and FF-ICE domain outbox routing:

mp.messaging.outgoing.out-events.connector=smallrye-kafka
mp.messaging.outgoing.out-events.topic=ffice-events-topic
mp.messaging.outgoing.out-events.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-events.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-events.compression.type=lz4
mp.messaging.outgoing.out-events.acks=1
mp.messaging.outgoing.out-events.max-inflight-messages=${KAFKA_MAX_INFLIGHT:0}
mp.messaging.outgoing.out-events.buffer-size=${KAFKA_BUFFER_SIZE:2048}
mp.messaging.outgoing.out-events.waitForWriteCompletion=false
mp.messaging.outgoing.out-events.tracing-enabled=true

mp.messaging.outgoing.out-dlq.connector=smallrye-kafka
mp.messaging.outgoing.out-dlq.topic=ffice-events-dlq-topic
mp.messaging.outgoing.out-dlq.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-dlq.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-dlq.compression.type=lz4
mp.messaging.outgoing.out-dlq.acks=1
mp.messaging.outgoing.out-dlq.max-inflight-messages=${KAFKA_MAX_INFLIGHT:0}
mp.messaging.outgoing.out-dlq.buffer-size=${KAFKA_BUFFER_SIZE:2048}
mp.messaging.outgoing.out-dlq.waitForWriteCompletion=false
mp.messaging.outgoing.out-dlq.tracing-enabled=true

mp.messaging.outgoing.out-flight-plan.connector=smallrye-kafka
mp.messaging.outgoing.out-flight-plan.topic=ffice-flight-plan-topic
mp.messaging.outgoing.out-flight-plan.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-flight-plan.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-flight-plan.compression.type=lz4
mp.messaging.outgoing.out-flight-plan.acks=1

mp.messaging.outgoing.out-flight-update.connector=smallrye-kafka
mp.messaging.outgoing.out-flight-update.topic=ffice-flight-update-topic
mp.messaging.outgoing.out-flight-update.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-flight-update.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-flight-update.compression.type=lz4
mp.messaging.outgoing.out-flight-update.acks=1

mp.messaging.outgoing.out-operations.connector=smallrye-kafka
mp.messaging.outgoing.out-operations.topic=ffice-operations-topic
mp.messaging.outgoing.out-operations.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-operations.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-operations.compression.type=lz4
mp.messaging.outgoing.out-operations.acks=1

mp.messaging.outgoing.out-trial.connector=smallrye-kafka
mp.messaging.outgoing.out-trial.topic=ffice-trial-topic
mp.messaging.outgoing.out-trial.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-trial.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-trial.compression.type=lz4
mp.messaging.outgoing.out-trial.acks=1

mp.messaging.outgoing.out-submission.connector=smallrye-kafka
mp.messaging.outgoing.out-submission.topic=ffice-submission-topic
mp.messaging.outgoing.out-submission.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-submission.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-submission.compression.type=lz4
mp.messaging.outgoing.out-submission.acks=1

mp.messaging.outgoing.out-data.connector=smallrye-kafka
mp.messaging.outgoing.out-data.topic=ffice-data-topic
mp.messaging.outgoing.out-data.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-data.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-data.compression.type=lz4
mp.messaging.outgoing.out-data.acks=1

mp.messaging.outgoing.out-ffice-dlq.connector=smallrye-kafka
mp.messaging.outgoing.out-ffice-dlq.topic=ffice-dlq-topic
mp.messaging.outgoing.out-ffice-dlq.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-ffice-dlq.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.out-ffice-dlq.compression.type=lz4
mp.messaging.outgoing.out-ffice-dlq.acks=1

Add Jandex index entries for external modules (model, outbox extension, inbox store, inbox reader):

# =============================================================================
# Jandex Index - External Modules
# =============================================================================
quarkus.index-dependency.swim-fixm-ffice-model.group-id=com.github.swim-developer
quarkus.index-dependency.swim-fixm-ffice-model.artifact-id=swim-fixm-ffice-model
quarkus.index-dependency.swim-outbox-kafka-ffice.group-id=com.github.swim-developer
quarkus.index-dependency.swim-outbox-kafka-ffice.artifact-id=swim-outbox-kafka-ffice
quarkus.index-dependency.swim-inbox-store-kafka.group-id=com.github.swim-developer
quarkus.index-dependency.swim-inbox-store-kafka.artifact-id=swim-inbox-store-kafka
quarkus.index-dependency.swim-inbox-reader-kafka.group-id=com.github.swim-developer
quarkus.index-dependency.swim-inbox-reader-kafka.artifact-id=swim-inbox-reader-kafka

3.4 Register the Caffeine cache

The framework uses Quarkus Caffeine cache for idempotency. The cache declared in application.properties is only initialized at build time if referenced by a @CacheName annotation. Create a simple registration bean:

package com.github.swim_developer.infrastructure;

import io.quarkus.cache.CacheName;
import io.quarkus.cache.Cache;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class CacheRegistration {

    @CacheName("processed-messages")
    Cache processedMessagesCache;
}

3.5 Implement the 10 domain classes

These are the classes that contain // TODO markers. Replace each one with the full implementation below.

1. EventExtractor.java

infrastructure/out/xml/EventExtractor.java - Extracts FF-ICE domain metadata from the JAXB object graph into the Event entity.

package com.github.swim_developer.infrastructure.out.xml;

import aero.fixm.base.AerodromeReferenceType;
import aero.fixm.base.GloballyUniqueFlightIdentifierType;
import aero.fixm.ffice.FficeMessageType;
import aero.fixm.ffice.MessageTypeType;
import aero.fixm.flight.ArrivalType;
import aero.fixm.flight.DepartureType;
import aero.fixm.flight.FlightIdentificationType;
import aero.fixm.flight.FlightType;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.framework.application.port.out.SwimEventExtractor;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.xml.bind.JAXBElement;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Optional;

@Slf4j
@ApplicationScoped
public class EventExtractor implements SwimEventExtractor<Event, FficeMessageType> {

    @Override
    public String getTypeLabel(Event event) {
        return event.getFficeMessageType() != null ? event.getFficeMessageType() : "unknown";
    }

    @Override
    public List<Optional<Event>> extract(FficeMessageType fficeMessage) {
        if (fficeMessage == null) {
            return List.of(Optional.empty());
        }

        Event event = new Event();

        MessageTypeType msgType = fficeMessage.getType();
        if (msgType != null) {
            event.setFficeMessageType(msgType.name());
        }

        if (fficeMessage.getTimestamp() != null) {
            event.setMessageTimestamp(fficeMessage.getTimestamp().toString());
        }
        if (fficeMessage.getUniqueMessageIdentifier() != null) {
            event.setUniqueMessageIdentifier(fficeMessage.getUniqueMessageIdentifier().getValue());
        }

        FlightType flight = fficeMessage.getFlight();
        if (flight != null) {
            extractFlightData(event, flight);
        }

        return List.of(Optional.of(event));
    }

    private void extractFlightData(Event event, FlightType flight) {
        FlightIdentificationType flightId = unwrap(flight.getFlightIdentification());
        if (flightId != null) {
            GloballyUniqueFlightIdentifierType gufi = unwrap(flightId.getGufi());
            if (gufi != null) {
                event.setGufi(gufi.getValue());
            }

            String acId = unwrap(flightId.getAircraftIdentification());
            if (acId != null) {
                event.setAircraftIdentification(acId);
            }
        }

        DepartureType departure = unwrap(flight.getDeparture());
        if (departure != null) {
            event.setDepartureAerodrome(extractLocationIndicator(departure.getDepartureAerodrome()));
        }

        ArrivalType arrival = unwrap(flight.getArrival());
        if (arrival != null) {
            String dest = extractLocationIndicator(arrival.getDestinationAerodrome());
            if (dest == null) {
                dest = extractLocationIndicator(arrival.getArrivalAerodrome());
            }
            event.setArrivalAerodrome(dest);
        }
    }

    private String extractLocationIndicator(JAXBElement<AerodromeReferenceType> element) {
        AerodromeReferenceType ref = unwrap(element);
        if (ref == null) {
            return null;
        }
        return unwrap(ref.getLocationIndicator());
    }

    private static <T> T unwrap(JAXBElement<T> element) {
        return (element != null && element.getValue() != null) ? element.getValue() : null;
    }
}

2. JaxbUnmarshallerPool.java

infrastructure/out/xml/JaxbUnmarshallerPool.java - Adapts the model's FficeUnmarshallerPool to the framework's SwimXmlUnmarshallerPort.

package com.github.swim_developer.infrastructure.out.xml;

import aero.fixm.ffice.FficeMessageType;
import aero.fixm.ffice.validation.FficeUnmarshallerPool;
import com.github.swim_developer.framework.application.port.out.SwimXmlUnmarshallerPort;
import com.github.swim_developer.framework.domain.exception.XmlValidationException;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@ApplicationScoped
public class JaxbUnmarshallerPool implements SwimXmlUnmarshallerPort<FficeMessageType> {

    private FficeUnmarshallerPool pool;

    @PostConstruct
    void init() {
        this.pool = new FficeUnmarshallerPool();
        log.info("FF-ICE JAXB unmarshaller pool initialized");
    }

    @Override
    public FficeMessageType unmarshalAndValidate(String xml) throws XmlValidationException {
        try {
            Object result = pool.unmarshalAndValidate(xml);
            if (result instanceof FficeMessageType fficeMessage) {
                return fficeMessage;
            }
            throw new XmlValidationException("Unexpected JAXB root type: " + result.getClass().getName());
        } catch (FficeUnmarshallerPool.FficeUnmarshalException e) {
            throw new XmlValidationException(e.getMessage(), e);
        }
    }
}

3. XmlEnvelopeParser.java

infrastructure/out/xml/XmlEnvelopeParser.java - Each FF-ICE AMQP message contains a single FficeMessage, so the implementation returns the input as-is.

package com.github.swim_developer.infrastructure.out.xml;

import jakarta.enterprise.context.ApplicationScoped;
import lombok.extern.slf4j.Slf4j;

import java.util.List;

@Slf4j
@ApplicationScoped
public class XmlEnvelopeParser {

    // TODO: Implement envelope splitting logic for your XML format
    // DNOTAM uses AIXMBasicMessage which may contain multiple members
    // ED-254 passes through single messages
    // Your service should split according to its data model
    public List<String> splitEnvelope(String rawPayload) {
        return List.of(rawPayload);
    }
}

4. EventDataValidator.java

application/service/EventDataValidator.java - Validates the extracted event data. Warns if fficeMessageType is missing.

package com.github.swim_developer.application.service;

import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.framework.application.model.ProcessingContext;
import com.github.swim_developer.framework.consumer.application.messaging.processing.SwimEventValidator;
import jakarta.enterprise.context.ApplicationScoped;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@ApplicationScoped
public class EventDataValidator implements SwimEventValidator<Event> {

    @Override
    public void validateExtractedData(ProcessingContext ctx, Event event) {
        if (event.getFficeMessageType() == null || event.getFficeMessageType().isBlank()) {
            log.warn("FF-ICE message type is missing - MessageId: {}", ctx.compositeMessageId());
        }
    }
}

5. EventFilterService.java

application/service/EventFilterService.java - Returns an empty list of filter rules (no domain-specific filtering initially).

package com.github.swim_developer.application.service;

import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.framework.application.port.out.SwimDeadLetterPort;
import com.github.swim_developer.framework.application.port.out.SwimSubscriptionFilterPort;
import com.github.swim_developer.framework.consumer.application.messaging.processing.AbstractEventFilterService;
import com.github.swim_developer.framework.consumer.application.messaging.processing.FilterRule;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import java.util.List;

@ApplicationScoped
public class EventFilterService extends AbstractEventFilterService<Event> {

    @Inject
    public EventFilterService(SwimSubscriptionFilterPort filterCache,
                              SwimDeadLetterPort deadLetterService) {
        super(filterCache, deadLetterService);
    }

    @Override
    protected List<FilterRule<Event>> buildFilterRules(Event event) {
        return List.of();
    }
}

6. EventPersistenceService.java

application/service/EventPersistenceService.java - Populates the event entity with audit fields and delegates persistence.

package com.github.swim_developer.application.service;

import com.github.swim_developer.application.port.out.EventStore;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.framework.application.model.OutboxDeliveryStatus;
import com.github.swim_developer.framework.application.model.ProcessingContext;
import com.github.swim_developer.framework.application.port.out.SwimDeadLetterPort;
import com.github.swim_developer.framework.consumer.application.messaging.outbox.OutboxRouterFanOut;
import com.github.swim_developer.framework.consumer.application.messaging.processing.AbstractEventPersistenceService;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import java.time.Instant;
import java.util.List;

@ApplicationScoped
public class EventPersistenceService extends AbstractEventPersistenceService<Event, Event> {

    private final EventStore repository;

    @Inject
    public EventPersistenceService(EventStore repository,
                                   OutboxRouterFanOut outboxRouterFanOut,
                                   SwimDeadLetterPort deadLetterService) {
        super(outboxRouterFanOut, deadLetterService);
        this.repository = repository;
    }

    @Override
    protected Event assembleEntity(ProcessingContext ctx, Event event, String contentHash) {
        event.setSubscriptionId(ctx.subscriptionId());
        event.setMessageId(ctx.amqpMessageId());
        event.setRawPayload(ctx.xmlPayload());
        event.setContentHash(contentHash);
        event.setDeliveryStatus(OutboxDeliveryStatus.SENT);
        event.setDispatchedAt(Instant.now());
        return event;
    }

    @Override
    protected void persistEntity(Event entity) { repository.persist(entity); }

    @Override
    protected void persistEntities(List<Event> entities) { repository.persist(entities); }

    @Override
    protected void updateEntity(Event entity) { repository.update(entity); }

    @Override
    protected String getServicePrefix() { return "FF-ICE"; }
}

7. ProcessorCallbacks.java

application/service/ProcessorCallbacks.java - Lifecycle hooks for event processing: paused subscription check, duplicate detection, error logging.

package com.github.swim_developer.application.service;

import com.github.swim_developer.application.port.out.SubscriptionStore;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.domain.model.Subscription;
import com.github.swim_developer.framework.application.model.ProcessingContext;
import com.github.swim_developer.framework.consumer.application.messaging.processing.SwimEventProcessorCallbacks;
import com.github.swim_developer.framework.consumer.application.messaging.processing.SwimEventProcessorConfig;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;

@Slf4j
@ApplicationScoped
public class ProcessorCallbacks implements SwimEventProcessorCallbacks<Event> {

    private final ProcessingMetrics metrics;
    private final SubscriptionStore subscriptionStore;

    @Inject
    public ProcessorCallbacks(ProcessingMetrics metrics, SubscriptionStore subscriptionStore) {
        this.metrics = metrics;
        this.subscriptionStore = subscriptionStore;
    }

    @Override
    public boolean preProcess(ProcessingContext ctx) {
        Optional<Subscription> sub = subscriptionStore.findBySubscriptionId(ctx.subscriptionId());
        if (sub.isPresent() && "PAUSED".equals(sub.get().getSubscriptionStatus())) {
            log.warn("PAUSED_SUBSCRIPTION_DISCARD: SubscriptionId={}, MessageId={}",
                    ctx.subscriptionId(), ctx.amqpMessageId());
            return true;
        }
        return false;
    }

    @Override
    public void onDuplicateDetected(ProcessingContext ctx, String contentHash) {
        metrics.incrementDuplicate();
    }

    @Override
    public void onExtractionFailure(ProcessingContext ctx, SwimEventProcessorConfig config) {
        metrics.incrementInvalid("INVALID");
        log.error("Invalid FF-ICE message - MessageId: {}", ctx.compositeMessageId());
    }

    @Override
    public void onValidationFailure(ProcessingContext ctx, Exception e) {
        log.error("Problematic XML (first 500 chars): {}",
                ctx.xmlPayload().length() > 500 ? ctx.xmlPayload().substring(0, 500) : ctx.xmlPayload());
    }
}

8. FficeSubscriptionRenewalStrategy.java

infrastructure/out/subscription/FficeSubscriptionRenewalStrategy.java - Finds subscriptions near expiry and renews them via the Subscription Manager REST API. The archetype generates this file with the correct Ffice prefix.

package com.github.swim_developer.infrastructure.out.subscription;

import com.github.swim_developer.domain.model.Subscription;
import com.github.swim_developer.application.port.out.SubscriptionStore;
import com.github.swim_developer.infrastructure.out.client.SubscriptionManagerAdapter;
import com.github.swim_developer.infrastructure.out.client.SubscriptionManagerRestClient;
import com.github.swim_developer.infrastructure.in.rest.dto.SubscriptionResponse;
import com.github.swim_developer.framework.consumer.infrastructure.out.config.provider.ProviderConfigParser;
import com.github.swim_developer.framework.application.model.ProviderConfiguration;
import com.github.swim_developer.framework.domain.model.SubscriptionRenewalInfo;
import com.github.swim_developer.framework.domain.exception.SubscriptionRenewalException;
import com.github.swim_developer.framework.domain.model.SubscriptionStatus;
import com.github.swim_developer.framework.application.port.out.SubscriptionRenewalStrategy;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;

import java.time.Instant;
import java.util.List;

@Slf4j
@ApplicationScoped
public class FficeSubscriptionRenewalStrategy implements SubscriptionRenewalStrategy {

    private final SubscriptionStore subscriptionStore;
    private final SubscriptionManagerAdapter smClientRegistry;
    private final ProviderConfigParser providerConfigParser;

    @Inject
    public FficeSubscriptionRenewalStrategy(SubscriptionStore subscriptionStore,
                                            SubscriptionManagerAdapter smClientRegistry,
                                            ProviderConfigParser providerConfigParser) {
        this.subscriptionStore = subscriptionStore;
        this.smClientRegistry = smClientRegistry;
        this.providerConfigParser = providerConfigParser;
    }

    @Override
    public List<SubscriptionRenewalInfo> findSubscriptionsNearExpiry(Instant threshold) {
        return subscriptionStore.findBySubscriptionEndBefore(threshold)
                .stream()
                .filter(sub -> SubscriptionStatus.ACTIVE.name().equals(sub.getSubscriptionStatus()))
                .map(sub -> new SubscriptionRenewalInfo(sub.getSubscriptionId(), sub.getSubscriptionEnd()))
                .toList();
    }

    @Override
    public void renewSubscription(String subscriptionId) throws SubscriptionRenewalException {
        log.info("Renewing subscription: {}", subscriptionId);

        Subscription subscription = subscriptionStore.findBySubscriptionId(subscriptionId)
                .orElseThrow(() -> new IllegalStateException("Subscription not found: " + subscriptionId));

        SubscriptionManagerRestClient client = resolveSmClient(subscription.getProviderId());
        SubscriptionResponse response = client.renewSubscription(subscriptionId);

        subscription.setSubscriptionEnd(response.subscriptionEnd());
        subscriptionStore.updateSubscription(subscription);

        log.info("Subscription renewed - ID: {}, New end: {}", subscriptionId, response.subscriptionEnd());
    }

    private SubscriptionManagerRestClient resolveSmClient(String providerId) {
        ProviderConfiguration provider = providerConfigParser.findByProviderId(providerId)
                .orElseThrow(() -> new IllegalStateException("Provider not configured: " + providerId));
        return smClientRegistry.getOrCreate(provider);
    }
}

9. InboxMessageHandler.java

infrastructure/in/amqp/InboxMessageHandler.java - Reads from the Kafka inbox topic, processes batches of FF-ICE events.

package com.github.swim_developer.infrastructure.in.amqp;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.swim_developer.application.usecase.EventProcessingUseCase;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.infrastructure.out.xml.XmlEnvelopeParser;
import com.github.swim_developer.extension.inbox.reader.kafka.AbstractKafkaInboxReader;
import com.github.swim_developer.framework.application.model.PreparedEvent;
import com.github.swim_developer.framework.application.model.ProcessingOutcome;
import com.github.swim_developer.framework.infrastructure.out.messaging.InboxEnvelope;
import io.micrometer.core.instrument.MeterRegistry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import java.util.List;
import java.util.concurrent.CompletionStage;

@Slf4j
@ApplicationScoped
public class InboxMessageHandler extends AbstractKafkaInboxReader {

    private final EventProcessingUseCase eventProcessor;
    private final XmlEnvelopeParser envelopeParser;

    protected InboxMessageHandler() {
        this(null, null, null, null);
    }

    @Inject
    public InboxMessageHandler(ObjectMapper objectMapper,
                               MeterRegistry meterRegistry,
                               EventProcessingUseCase eventProcessor,
                               XmlEnvelopeParser envelopeParser) {
        super(objectMapper, meterRegistry);
        this.eventProcessor = eventProcessor;
        this.envelopeParser = envelopeParser;
    }

    // TODO: Update channel name to match your Kafka inbox topic config
    @Incoming("in-ffice-inbox")
    @Blocking
    public CompletionStage<Void> onInboxBatch(KafkaRecordBatch<String, String> batch) {
        List<PreparedEvent<Event>> prepared = prepareBatch(batch, eventProcessor.eventProcessingOrchestrator());

        if (!prepared.isEmpty()) {
            eventProcessor.batchPersistAndDispatch(prepared);
            eventProcessor.markBatchAsProcessed(prepared);
        }

        processedCounter.increment(prepared.size());
        return batch.ack();
    }

    @Override
    public List<String> extractMessages(String rawPayload) {
        return envelopeParser.splitEnvelope(rawPayload);
    }

    @WithSpan("ffice.consumer.event.process")
    @Override
    public void processSingleMessage(InboxEnvelope envelope, String xmlPayload, int index) {
        Span.current().setAttribute("ffice.subscription", envelope.subscriptionId());
        Span.current().setAttribute("ffice.queue", envelope.queueName());

        ProcessingOutcome outcome = eventProcessor.processAndPersistSingleMessage(
                envelope.subscriptionId(),
                envelope.queueName(),
                envelope.amqpMessageId(),
                xmlPayload,
                index);
        Span.current().setAttribute("ffice.outcome", outcome.name());
    }

    @Override
    public String getMetricPrefix() {
        return "ffice";
    }
}

10. OutboxMessageHandler.java

infrastructure/out/messaging/OutboxMessageHandler.java - Processes outbox events with fault tolerance annotations.

package com.github.swim_developer.infrastructure.out.messaging;

import com.github.swim_developer.framework.consumer.application.messaging.outbox.AbstractOutboxEventConsumer;
import com.github.swim_developer.framework.consumer.application.messaging.outbox.OutboxRouterFanOut;
import com.github.swim_developer.framework.consumer.application.port.out.SwimOutboxRetryPort;
import com.github.swim_developer.framework.domain.model.SwimOutboxEvent;
import com.github.swim_developer.framework.infrastructure.out.cache.HandoffCache;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.infrastructure.out.persistence.MongoEventStore;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.common.annotation.Blocking;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.faulttolerance.Timeout;

import java.util.Optional;

@Slf4j
@ApplicationScoped
public class OutboxMessageHandler extends AbstractOutboxEventConsumer<Event> implements SwimOutboxRetryPort {

    public static final String OUTBOX_EVENT_ADDRESS = "outbox.pending";

    private final MongoEventStore eventRepository;
    private final OutboxRouterFanOut outboxRouterFanOut;
    private final HandoffCache handoffCache;

    @Inject
    public OutboxMessageHandler(MongoEventStore eventRepository,
                                OutboxRouterFanOut outboxRouterFanOut,
                                HandoffCache handoffCache,
                                @ConfigProperty(name = "swim.outbox.kafka.max-retries", defaultValue = "3") int maxKafkaRetries) {
        super(maxKafkaRetries);
        this.eventRepository = eventRepository;
        this.outboxRouterFanOut = outboxRouterFanOut;
        this.handoffCache = handoffCache;
    }

    @Override
    @ConsumeEvent(OUTBOX_EVENT_ADDRESS)
    @Blocking
    @Timeout(10000)
    @Retry(maxRetries = 3, delay = 1000)
    @Bulkhead(250)
    @WithSpan("ffice.consumer.outbox.kafka")
    public void processOutboxEvent(String eventId) {
        super.processOutboxEvent(eventId);
    }

    @Override
    public void retryOutboxEvent(SwimOutboxEvent event) {
        sendAndUpdateStatus((Event) event);
    }

    @Override
    protected Event resolveEvent(String eventIdStr) {
        Optional<Event> cached = handoffCache.getAndRemove(eventIdStr, Event.class);
        if (cached.isPresent()) {
            return cached.get();
        }
        return eventRepository.findEventById(eventIdStr);
    }

    @Override
    protected OutboxRouterFanOut getRouterFanOut() { return outboxRouterFanOut; }

    @Override
    protected String getEventId(Event event) {
        return event.getId() != null ? event.getId().toHexString() : null;
    }

    @Override
    protected void updateEvent(Event event) { eventRepository.persistOrUpdate(event); }

    public static String getEventAddress() { return OUTBOX_EVENT_ADDRESS; }
}

Create src/main/java/com/github/swim_developer/infrastructure/CacheRegistration.java:

package com.github.swim_developer.infrastructure;

import io.quarkus.cache.CacheName;
import io.quarkus.cache.Cache;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class CacheRegistration {

    @CacheName("processed-messages")
    Cache processedMessagesCache;
}

3.6 Replace Object with FficeMessageType in EventProcessingUseCase

The archetype generates EventProcessingUseCase.java with Object as a generic placeholder for the JAXB root type. You must replace it with the actual FF-ICE root type.

Open src/main/java/com/github/swim_developer/application/usecase/EventProcessingUseCase.java and make 4 replacements:

  1. Add the import at the top of the file (after the other imports):
    import aero.fixm.ffice.FficeMessageType;
  2. Replace all 4 occurrences of Object with FficeMessageType:
Line (approx.) Before After
Field declaration EventProcessingOrchestrator<Event, Object> EventProcessingOrchestrator<Event, FficeMessageType>
Constructor parameter SwimXmlUnmarshallerPort<Object> jaxbPool SwimXmlUnmarshallerPort<FficeMessageType> jaxbPool
Parser reference SwimEventParser<Object> parser SwimEventParser<FficeMessageType> parser
Getter return type EventProcessingOrchestrator<Event, Object> EventProcessingOrchestrator<Event, FficeMessageType>
  1. Delete the // TODO comment on the line above the field declaration.

After the edits, the file should look like this:

package com.github.swim_developer.application.usecase;

import com.github.swim_developer.application.service.ProcessingMetrics;
import com.github.swim_developer.application.port.out.SubscriptionStore;
import com.github.swim_developer.application.service.EventDataValidator;
import com.github.swim_developer.application.service.EventFilterService;
import com.github.swim_developer.application.service.EventPersistenceService;
import com.github.swim_developer.application.service.ProcessorCallbacks;
import com.github.swim_developer.domain.model.Event;
import aero.fixm.ffice.FficeMessageType;
import com.github.swim_developer.infrastructure.out.xml.EventExtractor;
import com.github.swim_developer.framework.consumer.application.messaging.processing.DefaultEventProcessorConfig;
import com.github.swim_developer.framework.application.model.PreparedEvent;
import com.github.swim_developer.framework.application.model.ProcessingContext;
import com.github.swim_developer.framework.application.model.ProcessingOutcome;
import com.github.swim_developer.framework.consumer.application.messaging.processing.EventProcessingOrchestrator;
import com.github.swim_developer.framework.consumer.application.messaging.processing.EventProcessingOrchestratorDependencies;
import com.github.swim_developer.framework.consumer.application.messaging.processing.SwimEventParser;
import com.github.swim_developer.framework.consumer.application.messaging.processing.SwimEventProcessorCallbacks;
import com.github.swim_developer.framework.application.port.in.SwimMessageInterceptor;
import com.github.swim_developer.framework.application.port.out.SwimXmlUnmarshallerPort;
import io.micrometer.core.instrument.MeterRegistry;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import java.util.List;

@ApplicationScoped
public class EventProcessingUseCase {

    private final EventProcessingOrchestrator<Event, FficeMessageType> orchestrator;
    private final EventPersistenceService persistenceService;

    @Inject
    public EventProcessingUseCase(
            DefaultEventProcessorConfig processorConfig,
            SwimXmlUnmarshallerPort<FficeMessageType> jaxbPool,
            EventExtractor eventExtractor,
            EventDataValidator validator,
            EventFilterService filterService,
            EventPersistenceService persistenceService,
            ProcessingMetrics metrics,
            MeterRegistry meterRegistry,
            SubscriptionStore subscriptionStore,
            @Any Instance<SwimMessageInterceptor> interceptorInstances) {
        this.persistenceService = persistenceService;
        SwimEventParser<FficeMessageType> parser = jaxbPool::unmarshalAndValidate;
        SwimEventProcessorCallbacks<Event> callbacks = new ProcessorCallbacks(metrics, subscriptionStore);
        this.orchestrator = new EventProcessingOrchestrator<>(new EventProcessingOrchestratorDependencies<>(
                processorConfig, parser, eventExtractor, validator, filterService,
                persistenceService, callbacks, meterRegistry, interceptorInstances));
    }

    public ProcessingOutcome processAndPersistSingleMessage(String subscriptionId, String queueName,
                                                            String amqpMessageId, String xml, int index) {
        return orchestrator.processMessage(new ProcessingContext(subscriptionId, queueName, amqpMessageId, xml, index, null));
    }

    public EventProcessingOrchestrator<Event, FficeMessageType> eventProcessingOrchestrator() {
        return orchestrator;
    }

    public void batchPersistAndDispatch(List<PreparedEvent<Event>> batch) {
        persistenceService.batchPersistAndDispatch(batch);
    }

    public void markBatchAsProcessed(List<PreparedEvent<Event>> batch) {
        orchestrator.markBatchAsProcessed(batch);
    }
}

3.7 Add domain fields to Event

Add FF-ICE-specific fields to the Event entity class:

private String fficeMessageType;
private String gufi;
private String aircraftIdentification;
private String departureAerodrome;
private String arrivalAerodrome;
private String messageTimestamp;
private String uniqueMessageIdentifier;

3.8 Update EventDTO and SubscriptionMapper

The archetype generates a generic EventDTO with a deliveryStatus field. Replace it with the FF-ICE-specific fields that match the domain model.

Replace src/main/java/com/github/swim_developer/infrastructure/in/rest/dto/EventDTO.java with:

package com.github.swim_developer.infrastructure.in.rest.dto;

import java.time.Instant;

public record EventDTO(
        String id,
        String messageId,
        String subscriptionId,
        Instant receivedAt,
        String fficeMessageType,
        String gufi,
        String aircraftIdentification,
        String departureAerodrome,
        String arrivalAerodrome
) {}

Update src/main/java/com/github/swim_developer/infrastructure/out/mapper/SubscriptionMapper.java to map the FF-ICE fields instead of deliveryStatus:

package com.github.swim_developer.infrastructure.out.mapper;

import com.github.swim_developer.framework.consumer.infrastructure.out.dlq.DeadLetterMessage;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.infrastructure.in.rest.dto.EventDTO;
import com.github.swim_developer.framework.infrastructure.out.messaging.DlqMessageDTO;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class SubscriptionMapper {

    public EventDTO toDTO(Event event) {
        return new EventDTO(
                event.getId() != null ? event.getId().toHexString() : null,
                event.getMessageId(),
                event.getSubscriptionId(),
                event.getReceivedAt(),
                event.getFficeMessageType(),
                event.getGufi(),
                event.getAircraftIdentification(),
                event.getDepartureAerodrome(),
                event.getArrivalAerodrome()
        );
    }

    public DlqMessageDTO toDTO(DeadLetterMessage dlq) {
        return new DlqMessageDTO(
                dlq.getId(),
                dlq.getAmqpMessageId(),
                dlq.getMessageIndex(),
                dlq.getSubscriptionId(),
                dlq.getQueueName(),
                dlq.getErrorType(),
                dlq.getErrorMessage(),
                dlq.getRawPayload(),
                dlq.getReceivedAt(),
                dlq.getFailedAt()
        );
    }
}

3.9 Verify compilation

./mvnw clean package -DskipTests        # Linux / macOS
mvnw.cmd clean package -DskipTests      # Windows