Step 7 of 8

Part 6: Tests

Step 7.1: Unit tests

Unit tests validate the domain-specific FF-ICE logic without requiring Quarkus DevServices or Testcontainers.

Test resources

Create 7 FF-ICE XML samples in src/test/resources/events/ (same content as the validator's events/ folder from Step 4.3):

  • filed-flight-plan.xml
  • flight-plan-update.xml
  • flight-departure.xml
  • flight-arrival.xml
  • flight-cancellation.xml
  • planning-status.xml
  • filing-status.xml

EventExtractorTest

Create src/test/java/com/github/swim_developer/unit/EventExtractorTest.java:

package com.github.swim_developer.unit;

import aero.fixm.ffice.FficeMessageType;
import aero.fixm.ffice.validation.FficeUnmarshallerPool;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.infrastructure.out.xml.EventExtractor;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;

class EventExtractorTest {

    private static final FficeUnmarshallerPool unmarshallerPool = new FficeUnmarshallerPool();
    private final EventExtractor extractor = new EventExtractor();

    private static String filedFlightPlanXml;
    private static String flightDepartureXml;
    private static String flightArrivalXml;

    @BeforeAll
    static void loadXmlSamples() throws IOException {
        filedFlightPlanXml = loadResource("events/filed-flight-plan.xml");
        flightDepartureXml = loadResource("events/flight-departure.xml");
        flightArrivalXml = loadResource("events/flight-arrival.xml");
    }

    @Test
    void extractFiledFlightPlan() throws Exception {
        FficeMessageType msg = (FficeMessageType) unmarshallerPool.unmarshalAndValidate(filedFlightPlanXml);

        List<Optional<Event>> results = extractor.extract(msg);

        assertThat(results).hasSize(1);
        assertThat(results.get(0)).isPresent();

        Event event = results.get(0).get();
        assertThat(event.getFficeMessageType()).isEqualTo("FILED_FLIGHT_PLAN");
        assertThat(event.getAircraftIdentification()).isEqualTo("TAP123");
        assertThat(event.getDepartureAerodrome()).isEqualTo("LPPT");
        assertThat(event.getArrivalAerodrome()).isEqualTo("LFPG");
        assertThat(event.getGufi()).isNotBlank();
        assertThat(event.getUniqueMessageIdentifier()).isNotBlank();
        assertThat(event.getMessageTimestamp()).isNotBlank();
    }

    @Test
    void extractFlightDeparture() throws Exception {
        FficeMessageType msg = (FficeMessageType) unmarshallerPool.unmarshalAndValidate(flightDepartureXml);

        List<Optional<Event>> results = extractor.extract(msg);

        assertThat(results).hasSize(1);
        Event event = results.get(0).orElseThrow();
        assertThat(event.getFficeMessageType()).isEqualTo("FLIGHT_DEPARTURE");
        assertThat(event.getAircraftIdentification()).isEqualTo("IBE789");
        assertThat(event.getDepartureAerodrome()).isEqualTo("LEMD");
        assertThat(event.getArrivalAerodrome()).isEqualTo("EDDF");
    }

    @Test
    void extractFlightArrival() throws Exception {
        FficeMessageType msg = (FficeMessageType) unmarshallerPool.unmarshalAndValidate(flightArrivalXml);

        List<Optional<Event>> results = extractor.extract(msg);

        assertThat(results).hasSize(1);
        Event event = results.get(0).orElseThrow();
        assertThat(event.getFficeMessageType()).isEqualTo("FLIGHT_ARRIVAL");
        assertThat(event.getAircraftIdentification()).isEqualTo("AFR101");
        assertThat(event.getDepartureAerodrome()).isEqualTo("LFPG");
        assertThat(event.getArrivalAerodrome()).isEqualTo("LIRF");
    }

    @Test
    void extractNullReturnsEmpty() {
        List<Optional<Event>> results = extractor.extract(null);

        assertThat(results).hasSize(1);
        assertThat(results.get(0)).isEmpty();
    }

    @Test
    void getTypeLabelReturnsMessageType() {
        Event event = new Event();
        event.setFficeMessageType("FILED_FLIGHT_PLAN");

        assertThat(extractor.getTypeLabel(event)).isEqualTo("FILED_FLIGHT_PLAN");
    }

    @Test
    void getTypeLabelReturnsUnknownWhenNull() {
        Event event = new Event();

        assertThat(extractor.getTypeLabel(event)).isEqualTo("unknown");
    }

    private static String loadResource(String path) throws IOException {
        try (InputStream is = EventExtractorTest.class.getClassLoader().getResourceAsStream(path)) {
            assertThat(is).as("Resource %s not found", path).isNotNull();
            return new String(is.readAllBytes(), StandardCharsets.UTF_8);
        }
    }
}

FficeMessageClassifierTest

Create src/test/java/com/github/swim_developer/unit/FficeMessageClassifierTest.java:

package com.github.swim_developer.unit;

import com.github.swim_developer.extension.outbox.kafka.ffice.FficeEventCategory;
import com.github.swim_developer.extension.outbox.kafka.ffice.FficeMessageClassifier;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;

import static org.assertj.core.api.Assertions.assertThat;

class FficeMessageClassifierTest {

    @ParameterizedTest
    @CsvSource({
            "events/filed-flight-plan.xml, FLIGHT_PLAN",
            "events/flight-plan-update.xml, FLIGHT_UPDATE",
            "events/flight-departure.xml, OPERATIONS",
            "events/flight-arrival.xml, OPERATIONS",
            "events/flight-cancellation.xml, OPERATIONS",
            "events/planning-status.xml, FLIGHT_UPDATE",
            "events/filing-status.xml, FLIGHT_UPDATE"
    })
    void classifyRealXmlSamples(String resourcePath, String expectedCategory) throws IOException {
        String xml = loadResource(resourcePath);

        FficeEventCategory category = FficeMessageClassifier.classify(xml);

        assertThat(category).isEqualTo(FficeEventCategory.valueOf(expectedCategory));
    }

    @Test
    void classifyUnknownMessage() {
        String xml = "<ffice:FficeMessage><ffice:type>SOMETHING_NEW</ffice:type></ffice:FficeMessage>";

        assertThat(FficeMessageClassifier.classify(xml)).isEqualTo(FficeEventCategory.UNKNOWN);
    }

    @Test
    void extractGufiFromInlineElement() {
        String xml = "<FficeMessage><globallyUniqueFlightIdentifier>abc-123</globallyUniqueFlightIdentifier></FficeMessage>";

        String gufi = FficeMessageClassifier.extractGufi(xml);

        assertThat(gufi).isEqualTo("abc-123");
    }

    @Test
    void extractGufiFromNamespacedElement() {
        String xml = "<ffice:FficeMessage><fx:gufi codeSpace=\"urn:uuid\">f47ac10b-58cc</fx:gufi></ffice:FficeMessage>";

        assertThat(FficeMessageClassifier.extractGufi(xml)).isEqualTo("unknown");
    }

    @Test
    void extractGufiReturnsUnknownWhenMissing() {
        String xml = "<ffice:FficeMessage><ffice:type>FILED_FLIGHT_PLAN</ffice:type></ffice:FficeMessage>";

        assertThat(FficeMessageClassifier.extractGufi(xml)).isEqualTo("unknown");
    }

    private static String loadResource(String path) throws IOException {
        try (InputStream is = FficeMessageClassifierTest.class.getClassLoader().getResourceAsStream(path)) {
            assertThat(is).as("Resource %s not found", path).isNotNull();
            return new String(is.readAllBytes(), StandardCharsets.UTF_8);
        }
    }
}

Step 7.2: Integration test prerequisites

The integration test requires two additions to pom.xml:

  1. Add the quarkus-wiremock dependency (provided scope) alongside the existing quarkus-wiremock-test:
<dependency>
    <groupId>io.quarkiverse.wiremock</groupId>
    <artifactId>quarkus-wiremock</artifactId>
    <version>${wiremock.version}</version>
    <scope>provided</scope>
</dependency>
  1. Configure the maven-failsafe-plugin with execution goals (replace the existing bare declaration):
<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-failsafe-plugin</artifactId>
    <executions>
        <execution>
            <goals>
                <goal>integration-test</goal>
                <goal>verify</goal>
            </goals>
        </execution>
    </executions>
    <configuration>
        <argLine>@{argLine} --add-opens java.base/java.lang=ALL-UNNAMED -javaagent:${settings.localRepository}/org/mockito/mockito-core/${mockito.version}/mockito-core-${mockito.version}.jar -Xshare:off</argLine>
        <systemPropertyVariables>
            <native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
            <java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
            <maven.home>${maven.home}</maven.home>
        </systemPropertyVariables>
    </configuration>
</plugin>

Step 7.3: Test profile (application-test.properties)

Create src/main/resources/application-test.properties:

# =============================================================================
# SWIM FF-ICE Consumer - Test Profile
# =============================================================================
swim.subscription.renewal.enabled=false
ffice.subscriptions=[]
ffice.subscriptions.delete-and-recreate=false
swim.subscriptions.delete-and-recreate=false

# =============================================================================
# MongoDB DevServices (Testcontainers)
# =============================================================================
quarkus.mongodb.devservices.image-name=mongo:8.2.7
quarkus.mongodb.server-selection-timeout=5000

# =============================================================================
# SWIM Providers (Test - WireMock SM + DevServices AMQP)
# =============================================================================
swim.providers=[{"providerId":"test-provider","subscriptionManager":{"url":"http://localhost:${quarkus.wiremock.devservices.port}","tls":null,"resilience":{"connectTimeoutMs":5000,"readTimeoutMs":10000,"retryMaxAttempts":4,"retryDelayMs":500}},"amqpBroker":{"host":"${amqp-host:localhost}","port":${amqp-port:5672},"sslEnabled":false,"username":"${amqp-user:guest}","password":"${amqp-password:guest}","tls":null}}]

# =============================================================================
# Schedulers - DISABLED (999d delay prevents execution during tests)
# =============================================================================
swim.scheduler.initial-delay=999d
reconciliation.retry.interval=999d
reconciliation.retry.initial-delay=999d
swim.inbox.recovery.interval=999d
swim.outbox.recovery.interval=999d
swim.outbox.cleanup.interval=999d
swim.tls.reload-period=off

# =============================================================================
# OpenTelemetry - DISABLED
# =============================================================================
quarkus.otel.enabled=false
quarkus.otel.sdk.disabled=true

# =============================================================================
# OIDC/Keycloak - DISABLED
# =============================================================================
quarkus.oidc.enabled=false
quarkus.oidc.tenant-enabled=false

# =============================================================================
# Kubernetes Client DevServices - DISABLED
# =============================================================================
quarkus.kubernetes-client.devservices.enabled=false

# =============================================================================
# Management Endpoint - DISABLED
# =============================================================================
quarkus.management.enabled=false

# =============================================================================
# Fault Tolerance (Test-specific)
# =============================================================================
swim.subscription.api.timeout=10000
swim.subscription.api.max-retries=3
mp.fault.tolerance.interceptor.priority=5
smallrye.faulttolerance.global.retry.enabled=true
smallrye.faulttolerance.enabled=true
quarkus.smallrye-fault-tolerance.interceptor.enabled=true

# =============================================================================
# Per-Subscription Heartbeat Monitor (fast intervals for tests)
# =============================================================================
swim.heartbeat.monitor.enabled=true
swim.heartbeat.monitor.tolerance=4s
swim.heartbeat.monitor.check-interval=1s

# =============================================================================
# Logging
# =============================================================================
quarkus.log.file.enabled=true
quarkus.log.file.path=target/app.log
quarkus.log.category."com.github.swim_developer".level=DEBUG

Step 7.4: Integration test class

The integration test validates the full consumer lifecycle with real infrastructure. Each test has Javadoc explaining what it proves and why it matters for the SFG/CP1.

What these 19 tests demonstrate:

Group Tests Framework capability proved
Subscription lifecycle 7 Full CRUD with SM, configHash deduplication, input validation
Event pipeline 5 JAXB extraction, DLQ routing, CP1 audit immutability, L1+L2 idempotency
Event routing 2 FF-ICE message classification to Kafka topics
Subscription guard 1 PAUSED subscriptions discard events before parsing
Observability 3 Stats, DLQ query, Kubernetes liveness probe
Self-healing 1 Automatic re-subscription when provider loses state (404)

Create src/test/java/com/github/swim_developer/integration/FficeConsumerIT.java:

package com.github.swim_developer.integration;

import com.github.swim_developer.extension.outbox.kafka.ffice.FficeEventCategory;
import com.github.swim_developer.extension.outbox.kafka.ffice.FficeMessageClassifier;
import com.github.swim_developer.framework.application.model.OutboxDeliveryStatus;
import com.github.swim_developer.framework.application.model.ProcessingOutcome;
import com.github.swim_developer.framework.consumer.infrastructure.out.dlq.DeadLetterMessage;
import com.github.swim_developer.framework.infrastructure.util.HashUtil;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.domain.model.Subscription;
import com.github.swim_developer.framework.persistence.mongodb.MongoDeadLetterStore;
import com.github.swim_developer.infrastructure.out.persistence.MongoEventStore;
import com.github.swim_developer.infrastructure.out.persistence.MongoSubscriptionStore;
import com.github.swim_developer.application.usecase.EventProcessingUseCase;
import com.github.swim_developer.application.usecase.SubscriptionUseCase;
import com.github.swim_developer.framework.consumer.infrastructure.out.idempotency.AbstractIdempotencyCache;
import com.github.tomakehurst.wiremock.client.WireMock;
import io.quarkiverse.wiremock.devservice.ConnectWireMock;
import io.quarkus.cache.Cache;
import io.quarkus.cache.CacheName;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import jakarta.inject.Inject;

import org.junit.jupiter.api.*;

import java.util.List;
import java.util.Map;

import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static io.restassured.RestAssured.given;
import static org.assertj.core.api.Assertions.assertThat;

/**
 * Integration tests for the FF-ICE consumer with real infrastructure.
 *
 * <p>Uses Quarkus Dev Services (Testcontainers) to spin up:</p>
 * <ul>
 *   <li><b>MongoDB</b> - event and subscription persistence</li>
 *   <li><b>Kafka (Redpanda)</b> - inbox/outbox event streaming</li>
 *   <li><b>WireMock</b> - simulates the SWIM Subscription Manager REST API</li>
 *   <li><b>Artemis</b> - AMQP 1.0 broker</li>
 * </ul>
 *
 * <h2>What These Tests Prove to the SFG</h2>
 * <ol>
 *   <li>The framework delivers a fully operational SWIM consumer from just 10 domain classes</li>
 *   <li>Self-healing: consumer recovers automatically when the provider loses subscription state</li>
 *   <li>Fault tolerance: retries with backoff handle transient network failures</li>
 *   <li>CP1 audit compliance: persisted payloads are immutable after persistence</li>
 *   <li>Idempotency: duplicate AMQP messages are detected via content hash (L1 + L2 cache)</li>
 *   <li>Event routing: FF-ICE messages are classified and dispatched to the correct Kafka topic</li>
 * </ol>
 */
@QuarkusTest
@ConnectWireMock
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class FficeConsumerIT {

    private static final String VALID_FFICE_XML = """
            <?xml version="1.0" encoding="UTF-8"?>
            <ffice:FficeMessage xmlns:ffice="http://www.fixm.aero/app/ffice/1.1"
                                xmlns:fx="http://www.fixm.aero/flight/4.3"
                                xmlns:fb="http://www.fixm.aero/base/4.3">
                <ffice:flight>
                    <fx:arrival>
                        <fx:destinationAerodrome>
                            <fb:locationIndicator>LFPG</fb:locationIndicator>
                        </fx:destinationAerodrome>
                    </fx:arrival>
                    <fx:departure>
                        <fx:departureAerodrome>
                            <fb:locationIndicator>LPPT</fb:locationIndicator>
                        </fx:departureAerodrome>
                    </fx:departure>
                    <fx:flightIdentification>
                        <fx:aircraftIdentification>TAP123</fx:aircraftIdentification>
                        <fx:gufi codeSpace="urn:uuid" creationTime="2026-05-06T07:00:00Z" namespaceDomain="FULLY_QUALIFIED_DOMAIN_NAME" namespaceIdentifier="swim-developer.github.io">f47ac10b-58cc-4372-a567-0e02b2c3d479</fx:gufi>
                    </fx:flightIdentification>
                </ffice:flight>
                <ffice:timestamp>2026-05-06T08:00:00.000Z</ffice:timestamp>
                <ffice:type>FILED_FLIGHT_PLAN</ffice:type>
                <ffice:uniqueMessageIdentifier codeSpace="urn:uuid">a1b2c3d4-e5f6-4890-abcd-ef1234567890</ffice:uniqueMessageIdentifier>
            </ffice:FficeMessage>
            """;

    private static final String INVALID_XML = "<not-valid-fixm>broken</not-valid-fixm>";

    WireMock wiremock;

    @Inject
    EventProcessingUseCase eventProcessor;

    @Inject
    MongoEventStore eventRepository;

    @Inject
    MongoDeadLetterStore dlqRepository;

    @Inject
    MongoSubscriptionStore subscriptionRepository;

    @Inject
    AbstractIdempotencyCache idempotencyCache;

    @Inject
    SubscriptionUseCase subscriptionService;

    @Inject
    @CacheName("processed-messages")
    Cache l1Cache;

    @BeforeEach
    void cleanDatabase(TestInfo testInfo) {
        System.out.printf("%n== > %s.%s%n", getClass().getSimpleName(), testInfo.getDisplayName());
        eventRepository.deleteAll();
        dlqRepository.deleteAll();
        subscriptionRepository.deleteAllSubscriptions();
        l1Cache.invalidateAll().await().indefinitely();
        wiremock.removeMappings();
        wiremock.resetAllScenarios();
        wiremock.resetRequests();
    }

    // ─── Group 1: Subscription Lifecycle ─────────────────────────────────

    /**
     * Full subscription creation: REST API call, WireMock SM interaction, MongoDB persistence.
     *
     * <p><b>Framework capability:</b> The entire subscription lifecycle (POST to SM, receive PAUSED
     * response, PUT to activate, persist locally) is handled by the framework. The developer
     * writes zero subscription management code.</p>
     */
    @Test
    @Order(1)
    void createSubscriptionEndToEnd() {
        stubSubscriptionManagerCreate("sub-IT-001", "FFICE-client-sub-IT-001");

        var body = Map.of("topic", "ffice.v1", "description", "Integration test");

        given()
                .contentType(ContentType.JSON)
                .body(body)
                .when()
                .post("/api/v1/subscriptions")
                .then()
                .statusCode(201);

        var persisted = subscriptionRepository.findBySubscriptionId("sub-IT-001");
        assertThat(persisted).isPresent();
        assertThat(persisted.get().getQueueName()).isEqualTo("FFICE-client-sub-IT-001");

        wiremock.verifyThat(postRequestedFor(urlPathEqualTo("/swim/v1/subscriptions")));
        wiremock.verifyThat(putRequestedFor(urlPathEqualTo("/swim/v1/subscriptions/sub-IT-001")));
    }

    /**
     * Duplicate configHash detection: identical subscription request returns existing
     * subscription WITHOUT calling the Subscription Manager again.
     *
     * <p><b>Framework capability:</b> Content-hash deduplication prevents redundant SM calls,
     * protecting against operator mistakes and automated retries.</p>
     */
    @Test
    @Order(2)
    void duplicateConfigHashReturnsExistingWithoutCallingSm() {
        stubSubscriptionManagerCreate("sub-dup-cfg-1", "FFICE-client-sub-dup-cfg-1");

        var body = Map.of("topic", "ffice.v1", "description", "First call");

        given().contentType(ContentType.JSON).body(body)
                .when().post("/api/v1/subscriptions")
                .then().statusCode(201);

        wiremock.resetRequests();

        var secondResponse = given().contentType(ContentType.JSON).body(body)
                .when().post("/api/v1/subscriptions")
                .then().statusCode(201)
                .extract().jsonPath();

        assertThat(secondResponse.getString("subscriptionId")).isEqualTo("sub-dup-cfg-1");
        wiremock.verifyThat(0, postRequestedFor(urlPathEqualTo("/swim/v1/subscriptions")));
    }

    /**
     * List all subscriptions from MongoDB.
     *
     * <p><b>Framework capability:</b> REST API for subscription querying is fully generated
     * by the archetype and backed by MongoDB via the framework persistence layer.</p>
     */
    @Test
    @Order(3)
    void listSubscriptionsFromMongoDB() {
        seedSubscription("sub-list-1", "ACTIVE");
        seedSubscription("sub-list-2", "PAUSED");

        var response = given()
                .when().get("/api/v1/subscriptions")
                .then().statusCode(200)
                .extract().body().jsonPath();

        assertThat(response.getList("$")).hasSize(2);
    }

    /**
     * Pause subscription: consumer REST API forwards to SM via WireMock, persists new status.
     *
     * <p><b>SWIM compliance:</b> Pause/resume follows the SWIM Registry pattern
     * (PUT /swim/v1/subscriptions/{id} with subscription_status field).</p>
     */
    @Test
    @Order(4)
    void pauseSubscriptionViaApi() {
        seedSubscription("sub-pause-1", "ACTIVE");
        stubSubscriptionManagerUpdate("sub-pause-1", "PAUSED");

        given()
                .contentType(ContentType.JSON)
                .body(Map.of("subscription_status", "PAUSED"))
                .when().put("/api/v1/subscriptions/sub-pause-1")
                .then().statusCode(200);

        var updated = subscriptionRepository.findBySubscriptionId("sub-pause-1");
        assertThat(updated).isPresent();
        assertThat(updated.get().getSubscriptionStatus()).isEqualTo("PAUSED");
    }

    /**
     * Resume subscription: transitions from PAUSED to ACTIVE.
     */
    @Test
    @Order(5)
    void resumeSubscriptionViaApi() {
        seedSubscription("sub-resume-1", "PAUSED");
        stubSubscriptionManagerUpdate("sub-resume-1", "ACTIVE");

        given()
                .contentType(ContentType.JSON)
                .body(Map.of("subscription_status", "ACTIVE"))
                .when().put("/api/v1/subscriptions/sub-resume-1")
                .then().statusCode(200);

        var updated = subscriptionRepository.findBySubscriptionId("sub-resume-1");
        assertThat(updated).isPresent();
        assertThat(updated.get().getSubscriptionStatus()).isEqualTo("ACTIVE");
    }

    /**
     * Delete subscription: consumer REST API forwards DELETE to SM, removes local record.
     */
    @Test
    @Order(6)
    void deleteSubscriptionCleanup() {
        seedSubscription("sub-del-1", "ACTIVE");

        wiremock.register(delete(urlPathEqualTo("/swim/v1/subscriptions/sub-del-1"))
                .willReturn(aResponse().withStatus(204)));

        given()
                .when().delete("/api/v1/subscriptions/sub-del-1")
                .then().statusCode(204);

        assertThat(subscriptionRepository.findBySubscriptionId("sub-del-1")).isEmpty();
        wiremock.verifyThat(deleteRequestedFor(urlPathEqualTo("/swim/v1/subscriptions/sub-del-1")));
    }

    /**
     * API contract validation: POST without required "topic" field is rejected.
     *
     * <p><b>Framework capability:</b> Input validation is enforced by the framework
     * before any SM call is made.</p>
     */
    @Test
    @Order(7)
    void createSubscriptionWithoutTopicRejects() {
        given()
                .contentType(ContentType.JSON)
                .body(Map.of("description", "no topic"))
                .when().post("/api/v1/subscriptions")
                .then().statusCode(400);
    }

    // ─── Group 2: Event Processing Pipeline ──────────────────────────────

    /**
     * Full pipeline: valid FF-ICE XML is parsed, domain fields extracted, persisted to MongoDB.
     *
     * <p><b>What this proves:</b> The 10 domain classes (EventExtractor, JaxbUnmarshallerPool,
     * XmlEnvelopeParser, EventDataValidator, EventFilterService, EventPersistenceService,
     * ProcessorCallbacks, FficeSubscriptionRenewalStrategy, InboxMessageHandler,
     * OutboxMessageHandler) integrate correctly with the framework orchestrator. GUFI,
     * aircraft identification, departure/arrival aerodromes, and message type are all
     * extracted from the FIXM 4.3 / FF-ICE 1.1 XML and persisted as first-class fields.</p>
     */
    @Test
    @Order(10)
    void validFficePersistedWithFullMetadata() {
        eventProcessor.processAndPersistSingleMessage(
                "sub-pipe-1", "queue-1", "AMQP-MSG-001", VALID_FFICE_XML, 0);

        List<Event> events = eventRepository.listAllDomain();
        assertThat(events).hasSize(1);

        Event event = events.get(0);
        assertThat(event.getSubscriptionId()).isEqualTo("sub-pipe-1");
        assertThat(event.getContentHash()).isNotEmpty();
        assertThat(event.getDeliveryStatus()).isIn(OutboxDeliveryStatus.PENDING, OutboxDeliveryStatus.SENT);
        assertThat(event.getRawPayload()).isEqualTo(VALID_FFICE_XML);
        assertThat(event.getFficeMessageType()).isEqualTo("FILED_FLIGHT_PLAN");
        assertThat(event.getGufi()).contains("f47ac10b");
        assertThat(event.getAircraftIdentification()).isEqualTo("TAP123");
        assertThat(event.getDepartureAerodrome()).isEqualTo("LPPT");
        assertThat(event.getArrivalAerodrome()).isEqualTo("LFPG");
    }

    /**
     * Invalid XML (not FIXM) is rejected and routed to the Dead Letter Queue.
     *
     * <p><b>CP1 compliance:</b> Non-conformant payloads must never reach business logic.
     * The framework validates all XML against the FIXM schema before extraction.
     * Rejected messages are preserved in the DLQ with error metadata for audit.</p>
     */
    @Test
    @Order(11)
    void invalidXmlRoutedToDlq() {
        try {
            eventProcessor.processAndPersistSingleMessage(
                    "sub-dlq-1", "queue-1", "AMQP-INVALID-001", INVALID_XML, 0);
        } catch (RuntimeException e) {
            // expected
        }

        assertThat(eventRepository.listAll()).isEmpty();

        List<DeadLetterMessage> dlqMessages = dlqRepository.listAllDomain();
        assertThat(dlqMessages).hasSize(1);
        assertThat(dlqMessages.get(0).getErrorType()).isEqualTo("VALIDATION_ERROR");
        assertThat(dlqMessages.get(0).getRawPayload()).isEqualTo(INVALID_XML);
    }

    /**
     * CP1 Audit Rule: once an event is persisted, audit-critical fields must remain immutable.
     * The outbox scheduler may update deliveryStatus, but rawPayload, contentHash,
     * subscriptionId, and messageId must never change.
     *
     * <p><b>Regulatory requirement:</b> EU Regulation 2021/116 (CP1) mandates a complete
     * audit trail. This test proves the framework preserves payload integrity across
     * lifecycle transitions (PENDING to SENT).</p>
     */
    @Test
    @Order(12)
    void auditFieldsRemainImmutableAfterPersistence() {
        eventProcessor.processAndPersistSingleMessage(
                "sub-audit-1", "queue-1", "AMQP-AUDIT-001", VALID_FFICE_XML, 0);

        Event original = eventRepository.listAllDomain().get(0);
        String originalPayload = original.getRawPayload();
        String originalHash = original.getContentHash();
        String originalSubId = original.getSubscriptionId();
        String originalMsgId = original.getMessageId();

        original.setDeliveryStatus(OutboxDeliveryStatus.SENT);
        eventRepository.update(original);

        Event reloaded = eventRepository.listAllDomain().get(0);
        assertThat(reloaded.getRawPayload()).isEqualTo(originalPayload);
        assertThat(reloaded.getContentHash()).isEqualTo(originalHash);
        assertThat(reloaded.getSubscriptionId()).isEqualTo(originalSubId);
        assertThat(reloaded.getMessageId()).isEqualTo(originalMsgId);
        assertThat(reloaded.getDeliveryStatus()).isEqualTo(OutboxDeliveryStatus.SENT);
    }

    /**
     * Duplicate content (same SHA-256 hash) is silently discarded. Only 1 event persisted.
     *
     * <p><b>Framework capability:</b> At-least-once AMQP delivery means duplicates are
     * inevitable. The framework uses a two-tier idempotency cache (L1 Caffeine + L2 MongoDB)
     * to guarantee exactly-once processing without developer intervention.</p>
     */
    @Test
    @Order(13)
    void duplicateContentDiscardedByIdempotency() {
        eventProcessor.processAndPersistSingleMessage(
                "sub-dup-1", "queue-1", "AMQP-DUP-001", VALID_FFICE_XML, 0);

        eventProcessor.processAndPersistSingleMessage(
                "sub-dup-1", "queue-1", "AMQP-DUP-002", VALID_FFICE_XML, 0);

        assertThat(eventRepository.listAll()).hasSize(1);
    }

    /**
     * Idempotency persists to MongoDB (L2 cache). After processing, the cache reports
     * the content hash as already processed, proving deduplication survives application
     * restarts and L1 cache eviction.
     *
     * <p><b>Production resilience:</b> When a pod restarts, the L1 Caffeine cache is lost.
     * The L2 MongoDB cache ensures duplicates are still detected. This test proves
     * the full chain: hash computation, L1 insert, L2 persistence, and lookup.</p>
     */
    @Test
    @Order(14)
    void idempotencyPersistsToMongoDbL2Cache() {
        String hash = HashUtil.sha256(VALID_FFICE_XML);

        eventProcessor.processAndPersistSingleMessage(
                "sub-idem-1", "queue-1", "AMQP-IDEM-001", VALID_FFICE_XML, 0);

        assertThat(eventRepository.listAll()).hasSize(1);
        assertThat(idempotencyCache.isAlreadyProcessed("sub-idem-1", hash)).isTrue();
    }

    // ─── Group 3: Event Routing ──────────────────────────────────────────

    /**
     * FILED_FLIGHT_PLAN is classified as FLIGHT_PLAN category for Kafka routing.
     *
     * <p><b>Framework capability:</b> The outbox extension (swim-outbox-kafka-ffice) uses
     * FficeMessageClassifier to determine the target Kafka topic. Each FF-ICE message type
     * maps to a specific category, enabling downstream systems to subscribe only to
     * the events they care about.</p>
     */
    @Test
    @Order(15)
    void routingClassifiesFiledFlightPlanCorrectly() {
        eventProcessor.processAndPersistSingleMessage(
                "sub-route-1", "queue-1", "AMQP-ROUTE-001", VALID_FFICE_XML, 0);

        Event persisted = eventRepository.listAllDomain().get(0);
        assertThat(FficeMessageClassifier.classify(persisted.getRawPayload()))
                .isEqualTo(FficeEventCategory.FLIGHT_PLAN);
    }

    /**
     * FLIGHT_DEPARTURE is classified as OPERATIONS category, distinct from FLIGHT_PLAN.
     *
     * <p><b>Domain significance:</b> Flight departures are operational events consumed by
     * ATC systems in real time, while flight plans are planning artifacts. Correct routing
     * ensures the right system gets the right data.</p>
     */
    @Test
    @Order(16)
    void routingClassifiesFlightDepartureCorrectly() {
        String departureXml = VALID_FFICE_XML.replace("FILED_FLIGHT_PLAN", "FLIGHT_DEPARTURE");
        eventProcessor.processAndPersistSingleMessage(
                "sub-route-dep", "queue-1", "AMQP-ROUTE-DEP", departureXml, 0);

        Event persisted = eventRepository.listAllDomain().get(0);
        assertThat(FficeMessageClassifier.classify(persisted.getRawPayload()))
                .isEqualTo(FficeEventCategory.OPERATIONS);
    }

    // ─── Group 4: Subscription Guard ─────────────────────────────────────

    /**
     * Events arriving for a PAUSED subscription are silently discarded.
     *
     * <p><b>Framework capability:</b> The preProcess guard checks subscription status
     * in MongoDB before any XML parsing. This is a business rule: paused means
     * "stop processing", not just "stop receiving". No CPU wasted on JAXB parsing
     * for a subscription the operator has intentionally paused.</p>
     */
    @Test
    @Order(17)
    void pausedSubscriptionDiscardsEvents() {
        seedSubscription("sub-paused-discard", "PAUSED");

        var outcome = eventProcessor.processAndPersistSingleMessage(
                "sub-paused-discard", "queue-1", "AMQP-PAUSED-001", VALID_FFICE_XML, 0);

        assertThat(outcome).isEqualTo(ProcessingOutcome.SKIPPED);
        assertThat(eventRepository.listAllDomain()).isEmpty();
        assertThat(dlqRepository.listAllDomain()).isEmpty();
    }

    // ─── Group 5: Observability ──────────────────────────────────────────

    /**
     * Aggregate statistics reflect the real consumer state after mixed operations.
     *
     * <p><b>Operational value:</b> ANSPs need real-time visibility into consumer health.
     * This endpoint powers dashboards that show processed events, DLQ depth, and
     * active subscriptions at a glance.</p>
     */
    @Test
    @Order(20)
    void statsReflectRealState() {
        seedSubscription("sub-stats-1", "ACTIVE");

        eventProcessor.processAndPersistSingleMessage(
                "sub-stats-1", "queue-1", "AMQP-STATS-001", VALID_FFICE_XML, 0);
        try {
            eventProcessor.processAndPersistSingleMessage(
                    "sub-stats-1", "queue-1", "AMQP-STATS-002", INVALID_XML, 0);
        } catch (RuntimeException e) {
            // expected
        }

        var response = given()
                .when().get("/swim/v1/operational/stats")
                .then().statusCode(200)
                .extract().body().jsonPath();

        assertThat(response.getLong("totalEvents")).isEqualTo(1);
        assertThat(response.getLong("totalDlq")).isEqualTo(1);
        assertThat(response.getInt("activeSubscriptions")).isEqualTo(1);
    }

    /**
     * DLQ query returns rejected messages with pagination.
     *
     * <p><b>Audit capability:</b> Operators and regulators can inspect every rejected
     * message, including the original raw payload and the validation error reason.</p>
     */
    @Test
    @Order(21)
    void queryDlqAfterRejection() {
        try {
            eventProcessor.processAndPersistSingleMessage(
                    "sub-dlq-q", "queue-1", "AMQP-DLQ-Q-001", INVALID_XML, 0);
        } catch (RuntimeException e) {
            // expected
        }

        var response = given()
                .when().get("/swim/v1/operational/dlq?page=0&size=10")
                .then().statusCode(200)
                .extract().body().jsonPath();

        assertThat(response.getList("content")).hasSize(1);
    }

    /**
     * Liveness probe returns UP when the application is running.
     *
     * <p><b>Kubernetes integration:</b> OpenShift uses this probe to determine if the
     * pod needs restarting. The framework registers health checks for MongoDB,
     * AMQP connections, and heartbeat monitoring automatically.</p>
     */
    @Test
    @Order(22)
    void livenessProbeUp() {
        given()
                .when().get("/q/health/live")
                .then().statusCode(200);
    }

    // ─── Group 6: Self-Healing ───────────────────────────────────────────

    /**
     * Provider returns 404 during resume: framework deletes the stale local subscription
     * and triggers a full re-subscription cycle (POST, PAUSED, ACTIVE).
     *
     * <p><b>This is the gold standard for the SFG.</b> In production, SWIM providers
     * (EUROCONTROL, Austrocontrol, LFV) may lose subscription state after upgrades,
     * disaster recovery, or database migrations. Without self-healing, an ANSP would
     * need manual intervention to restore data flow. The framework detects the 404,
     * cleans up the orphan, and recreates the subscription automatically.</p>
     */
    @Test
    @Order(40)
    void automaticResubscriptionOnProviderStateLoss() {
        seedSubscription("sub-lost-1", "ACTIVE");

        wiremock.register(put(urlPathEqualTo("/swim/v1/subscriptions/sub-lost-1"))
                .willReturn(aResponse().withStatus(404)));

        stubSubscriptionManagerCreate("sub-recovered-1", "FFICE-client-sub-recovered-1");

        try {
            subscriptionService.resumeSubscription("sub-lost-1");
        } catch (Exception e) {
            // expected: provider returns 404 for lost subscription
        }

        assertThat(subscriptionRepository.findBySubscriptionId("sub-lost-1"))
                .as("Old subscription must be deleted after provider 404")
                .isEmpty();

        assertThat(subscriptionRepository.findBySubscriptionId("sub-recovered-1"))
                .as("New subscription must be created after provider state loss recovery")
                .isPresent()
                .get()
                .satisfies(s -> assertThat(s.getSubscriptionStatus()).isEqualTo("ACTIVE"));

        wiremock.verifyThat(postRequestedFor(urlPathEqualTo("/swim/v1/subscriptions")));
    }

    // ── Helpers ──────────────────────────────────────────────────────────

    private void stubSubscriptionManagerCreate(String subscriptionId, String queueName) {
        String responseJson = """
                {
                    "subscriptionId": "%s",
                    "subscriptionStatus": "PAUSED",
                    "queueName": "%s",
                    "topic": "ffice.v1",
                    "description": "Integration test"
                }
                """.formatted(subscriptionId, queueName);

        wiremock.register(post(urlPathEqualTo("/swim/v1/subscriptions"))
                .willReturn(aResponse()
                        .withStatus(201)
                        .withHeader("Content-Type", "application/json")
                        .withBody(responseJson)));

        stubSubscriptionManagerUpdate(subscriptionId, "ACTIVE");
    }

    private void stubSubscriptionManagerUpdate(String subscriptionId, String newStatus) {
        String responseJson = """
                {
                    "subscriptionId": "%s",
                    "subscriptionStatus": "%s",
                    "queueName": "FFICE-client-%s",
                    "topic": "ffice.v1"
                }
                """.formatted(subscriptionId, newStatus, subscriptionId);

        wiremock.register(put(urlPathEqualTo("/swim/v1/subscriptions/" + subscriptionId))
                .willReturn(aResponse()
                        .withStatus(200)
                        .withHeader("Content-Type", "application/json")
                        .withBody(responseJson)));
    }

    private void seedSubscription(String subscriptionId, String status) {
        Subscription sub = new Subscription();
        sub.setSubscriptionId(subscriptionId);
        sub.setQueueName("FFICE-client-" + subscriptionId);
        sub.setSubscriptionStatus(status);
        sub.setTopic("ffice.v1");
        sub.setDescription("Seeded for test");
        sub.setType(com.github.swim_developer.framework.domain.model.SubscriptionType.DECLARED.name());
        sub.setConfigHash("test-hash-" + subscriptionId);
        sub.setProviderId("test-provider");
        subscriptionRepository.persistSubscription(sub);
    }
}

Step 7.5: Run tests

From the swim-ffice-consumer directory:

# Unit tests only
./mvnw test

# Unit + integration tests
./mvnw verify -DskipITs=false

# Integration tests only (skip unit tests)
./mvnw verify -DskipITs=false -Dsurefire.skip=true

Expected output: 17 unit tests + 19 integration tests = 36 tests, 0 failures.

The integration tests cover 6 critical areas:

  1. Subscription lifecycle (7 tests): create, list, pause, resume, delete, duplicate detection, input validation
  2. Event pipeline (5 tests): full metadata extraction, DLQ routing, CP1 audit immutability, L1+L2 idempotency
  3. Event routing (2 tests): FF-ICE message classification (FILED_FLIGHT_PLAN, FLIGHT_DEPARTURE)
  4. Subscription guard (1 test): PAUSED subscriptions discard events before JAXB parsing
  5. Observability (3 tests): aggregate stats, DLQ query with pagination, Kubernetes liveness probe
  6. Self-healing (1 test): automatic re-subscription when provider loses state (404 recovery)