Part 6: Tests
Step 7.1: Unit tests
Unit tests validate the domain-specific FF-ICE logic without requiring Quarkus DevServices or Testcontainers.
Test resources
Create 7 FF-ICE XML samples in src/test/resources/events/ (same content as the validator's events/ folder from Step 4.3):
filed-flight-plan.xmlflight-plan-update.xmlflight-departure.xmlflight-arrival.xmlflight-cancellation.xmlplanning-status.xmlfiling-status.xml
EventExtractorTest
Create src/test/java/com/github/swim_developer/unit/EventExtractorTest.java:
package com.github.swim_developer.unit;
import aero.fixm.ffice.FficeMessageType;
import aero.fixm.ffice.validation.FficeUnmarshallerPool;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.infrastructure.out.xml.EventExtractor;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
class EventExtractorTest {
private static final FficeUnmarshallerPool unmarshallerPool = new FficeUnmarshallerPool();
private final EventExtractor extractor = new EventExtractor();
private static String filedFlightPlanXml;
private static String flightDepartureXml;
private static String flightArrivalXml;
@BeforeAll
static void loadXmlSamples() throws IOException {
filedFlightPlanXml = loadResource("events/filed-flight-plan.xml");
flightDepartureXml = loadResource("events/flight-departure.xml");
flightArrivalXml = loadResource("events/flight-arrival.xml");
}
@Test
void extractFiledFlightPlan() throws Exception {
FficeMessageType msg = (FficeMessageType) unmarshallerPool.unmarshalAndValidate(filedFlightPlanXml);
List<Optional<Event>> results = extractor.extract(msg);
assertThat(results).hasSize(1);
assertThat(results.get(0)).isPresent();
Event event = results.get(0).get();
assertThat(event.getFficeMessageType()).isEqualTo("FILED_FLIGHT_PLAN");
assertThat(event.getAircraftIdentification()).isEqualTo("TAP123");
assertThat(event.getDepartureAerodrome()).isEqualTo("LPPT");
assertThat(event.getArrivalAerodrome()).isEqualTo("LFPG");
assertThat(event.getGufi()).isNotBlank();
assertThat(event.getUniqueMessageIdentifier()).isNotBlank();
assertThat(event.getMessageTimestamp()).isNotBlank();
}
@Test
void extractFlightDeparture() throws Exception {
FficeMessageType msg = (FficeMessageType) unmarshallerPool.unmarshalAndValidate(flightDepartureXml);
List<Optional<Event>> results = extractor.extract(msg);
assertThat(results).hasSize(1);
Event event = results.get(0).orElseThrow();
assertThat(event.getFficeMessageType()).isEqualTo("FLIGHT_DEPARTURE");
assertThat(event.getAircraftIdentification()).isEqualTo("IBE789");
assertThat(event.getDepartureAerodrome()).isEqualTo("LEMD");
assertThat(event.getArrivalAerodrome()).isEqualTo("EDDF");
}
@Test
void extractFlightArrival() throws Exception {
FficeMessageType msg = (FficeMessageType) unmarshallerPool.unmarshalAndValidate(flightArrivalXml);
List<Optional<Event>> results = extractor.extract(msg);
assertThat(results).hasSize(1);
Event event = results.get(0).orElseThrow();
assertThat(event.getFficeMessageType()).isEqualTo("FLIGHT_ARRIVAL");
assertThat(event.getAircraftIdentification()).isEqualTo("AFR101");
assertThat(event.getDepartureAerodrome()).isEqualTo("LFPG");
assertThat(event.getArrivalAerodrome()).isEqualTo("LIRF");
}
@Test
void extractNullReturnsEmpty() {
List<Optional<Event>> results = extractor.extract(null);
assertThat(results).hasSize(1);
assertThat(results.get(0)).isEmpty();
}
@Test
void getTypeLabelReturnsMessageType() {
Event event = new Event();
event.setFficeMessageType("FILED_FLIGHT_PLAN");
assertThat(extractor.getTypeLabel(event)).isEqualTo("FILED_FLIGHT_PLAN");
}
@Test
void getTypeLabelReturnsUnknownWhenNull() {
Event event = new Event();
assertThat(extractor.getTypeLabel(event)).isEqualTo("unknown");
}
private static String loadResource(String path) throws IOException {
try (InputStream is = EventExtractorTest.class.getClassLoader().getResourceAsStream(path)) {
assertThat(is).as("Resource %s not found", path).isNotNull();
return new String(is.readAllBytes(), StandardCharsets.UTF_8);
}
}
}
FficeMessageClassifierTest
Create src/test/java/com/github/swim_developer/unit/FficeMessageClassifierTest.java:
package com.github.swim_developer.unit;
import com.github.swim_developer.extension.outbox.kafka.ffice.FficeEventCategory;
import com.github.swim_developer.extension.outbox.kafka.ffice.FficeMessageClassifier;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import static org.assertj.core.api.Assertions.assertThat;
class FficeMessageClassifierTest {
@ParameterizedTest
@CsvSource({
"events/filed-flight-plan.xml, FLIGHT_PLAN",
"events/flight-plan-update.xml, FLIGHT_UPDATE",
"events/flight-departure.xml, OPERATIONS",
"events/flight-arrival.xml, OPERATIONS",
"events/flight-cancellation.xml, OPERATIONS",
"events/planning-status.xml, FLIGHT_UPDATE",
"events/filing-status.xml, FLIGHT_UPDATE"
})
void classifyRealXmlSamples(String resourcePath, String expectedCategory) throws IOException {
String xml = loadResource(resourcePath);
FficeEventCategory category = FficeMessageClassifier.classify(xml);
assertThat(category).isEqualTo(FficeEventCategory.valueOf(expectedCategory));
}
@Test
void classifyUnknownMessage() {
String xml = "<ffice:FficeMessage><ffice:type>SOMETHING_NEW</ffice:type></ffice:FficeMessage>";
assertThat(FficeMessageClassifier.classify(xml)).isEqualTo(FficeEventCategory.UNKNOWN);
}
@Test
void extractGufiFromInlineElement() {
String xml = "<FficeMessage><globallyUniqueFlightIdentifier>abc-123</globallyUniqueFlightIdentifier></FficeMessage>";
String gufi = FficeMessageClassifier.extractGufi(xml);
assertThat(gufi).isEqualTo("abc-123");
}
@Test
void extractGufiFromNamespacedElement() {
String xml = "<ffice:FficeMessage><fx:gufi codeSpace=\"urn:uuid\">f47ac10b-58cc</fx:gufi></ffice:FficeMessage>";
assertThat(FficeMessageClassifier.extractGufi(xml)).isEqualTo("unknown");
}
@Test
void extractGufiReturnsUnknownWhenMissing() {
String xml = "<ffice:FficeMessage><ffice:type>FILED_FLIGHT_PLAN</ffice:type></ffice:FficeMessage>";
assertThat(FficeMessageClassifier.extractGufi(xml)).isEqualTo("unknown");
}
private static String loadResource(String path) throws IOException {
try (InputStream is = FficeMessageClassifierTest.class.getClassLoader().getResourceAsStream(path)) {
assertThat(is).as("Resource %s not found", path).isNotNull();
return new String(is.readAllBytes(), StandardCharsets.UTF_8);
}
}
}
Step 7.2: Integration test prerequisites
The integration test requires two additions to pom.xml:
- Add the
quarkus-wiremockdependency (provided scope) alongside the existingquarkus-wiremock-test:
<dependency>
<groupId>io.quarkiverse.wiremock</groupId>
<artifactId>quarkus-wiremock</artifactId>
<version>${wiremock.version}</version>
<scope>provided</scope>
</dependency>
- Configure the
maven-failsafe-pluginwith execution goals (replace the existing bare declaration):
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<argLine>@{argLine} --add-opens java.base/java.lang=ALL-UNNAMED -javaagent:${settings.localRepository}/org/mockito/mockito-core/${mockito.version}/mockito-core-${mockito.version}.jar -Xshare:off</argLine>
<systemPropertyVariables>
<native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
<maven.home>${maven.home}</maven.home>
</systemPropertyVariables>
</configuration>
</plugin>
Step 7.3: Test profile (application-test.properties)
Create src/main/resources/application-test.properties:
# =============================================================================
# SWIM FF-ICE Consumer - Test Profile
# =============================================================================
swim.subscription.renewal.enabled=false
ffice.subscriptions=[]
ffice.subscriptions.delete-and-recreate=false
swim.subscriptions.delete-and-recreate=false
# =============================================================================
# MongoDB DevServices (Testcontainers)
# =============================================================================
quarkus.mongodb.devservices.image-name=mongo:8.2.7
quarkus.mongodb.server-selection-timeout=5000
# =============================================================================
# SWIM Providers (Test - WireMock SM + DevServices AMQP)
# =============================================================================
swim.providers=[{"providerId":"test-provider","subscriptionManager":{"url":"http://localhost:${quarkus.wiremock.devservices.port}","tls":null,"resilience":{"connectTimeoutMs":5000,"readTimeoutMs":10000,"retryMaxAttempts":4,"retryDelayMs":500}},"amqpBroker":{"host":"${amqp-host:localhost}","port":${amqp-port:5672},"sslEnabled":false,"username":"${amqp-user:guest}","password":"${amqp-password:guest}","tls":null}}]
# =============================================================================
# Schedulers - DISABLED (999d delay prevents execution during tests)
# =============================================================================
swim.scheduler.initial-delay=999d
reconciliation.retry.interval=999d
reconciliation.retry.initial-delay=999d
swim.inbox.recovery.interval=999d
swim.outbox.recovery.interval=999d
swim.outbox.cleanup.interval=999d
swim.tls.reload-period=off
# =============================================================================
# OpenTelemetry - DISABLED
# =============================================================================
quarkus.otel.enabled=false
quarkus.otel.sdk.disabled=true
# =============================================================================
# OIDC/Keycloak - DISABLED
# =============================================================================
quarkus.oidc.enabled=false
quarkus.oidc.tenant-enabled=false
# =============================================================================
# Kubernetes Client DevServices - DISABLED
# =============================================================================
quarkus.kubernetes-client.devservices.enabled=false
# =============================================================================
# Management Endpoint - DISABLED
# =============================================================================
quarkus.management.enabled=false
# =============================================================================
# Fault Tolerance (Test-specific)
# =============================================================================
swim.subscription.api.timeout=10000
swim.subscription.api.max-retries=3
mp.fault.tolerance.interceptor.priority=5
smallrye.faulttolerance.global.retry.enabled=true
smallrye.faulttolerance.enabled=true
quarkus.smallrye-fault-tolerance.interceptor.enabled=true
# =============================================================================
# Per-Subscription Heartbeat Monitor (fast intervals for tests)
# =============================================================================
swim.heartbeat.monitor.enabled=true
swim.heartbeat.monitor.tolerance=4s
swim.heartbeat.monitor.check-interval=1s
# =============================================================================
# Logging
# =============================================================================
quarkus.log.file.enabled=true
quarkus.log.file.path=target/app.log
quarkus.log.category."com.github.swim_developer".level=DEBUG
Step 7.4: Integration test class
The integration test validates the full consumer lifecycle with real infrastructure. Each test has Javadoc explaining what it proves and why it matters for the SFG/CP1.
What these 19 tests demonstrate:
| Group | Tests | Framework capability proved |
|---|---|---|
| Subscription lifecycle | 7 | Full CRUD with SM, configHash deduplication, input validation |
| Event pipeline | 5 | JAXB extraction, DLQ routing, CP1 audit immutability, L1+L2 idempotency |
| Event routing | 2 | FF-ICE message classification to Kafka topics |
| Subscription guard | 1 | PAUSED subscriptions discard events before parsing |
| Observability | 3 | Stats, DLQ query, Kubernetes liveness probe |
| Self-healing | 1 | Automatic re-subscription when provider loses state (404) |
Create src/test/java/com/github/swim_developer/integration/FficeConsumerIT.java:
package com.github.swim_developer.integration;
import com.github.swim_developer.extension.outbox.kafka.ffice.FficeEventCategory;
import com.github.swim_developer.extension.outbox.kafka.ffice.FficeMessageClassifier;
import com.github.swim_developer.framework.application.model.OutboxDeliveryStatus;
import com.github.swim_developer.framework.application.model.ProcessingOutcome;
import com.github.swim_developer.framework.consumer.infrastructure.out.dlq.DeadLetterMessage;
import com.github.swim_developer.framework.infrastructure.util.HashUtil;
import com.github.swim_developer.domain.model.Event;
import com.github.swim_developer.domain.model.Subscription;
import com.github.swim_developer.framework.persistence.mongodb.MongoDeadLetterStore;
import com.github.swim_developer.infrastructure.out.persistence.MongoEventStore;
import com.github.swim_developer.infrastructure.out.persistence.MongoSubscriptionStore;
import com.github.swim_developer.application.usecase.EventProcessingUseCase;
import com.github.swim_developer.application.usecase.SubscriptionUseCase;
import com.github.swim_developer.framework.consumer.infrastructure.out.idempotency.AbstractIdempotencyCache;
import com.github.tomakehurst.wiremock.client.WireMock;
import io.quarkiverse.wiremock.devservice.ConnectWireMock;
import io.quarkus.cache.Cache;
import io.quarkus.cache.CacheName;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.http.ContentType;
import jakarta.inject.Inject;
import org.junit.jupiter.api.*;
import java.util.List;
import java.util.Map;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static io.restassured.RestAssured.given;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Integration tests for the FF-ICE consumer with real infrastructure.
*
* <p>Uses Quarkus Dev Services (Testcontainers) to spin up:</p>
* <ul>
* <li><b>MongoDB</b> - event and subscription persistence</li>
* <li><b>Kafka (Redpanda)</b> - inbox/outbox event streaming</li>
* <li><b>WireMock</b> - simulates the SWIM Subscription Manager REST API</li>
* <li><b>Artemis</b> - AMQP 1.0 broker</li>
* </ul>
*
* <h2>What These Tests Prove to the SFG</h2>
* <ol>
* <li>The framework delivers a fully operational SWIM consumer from just 10 domain classes</li>
* <li>Self-healing: consumer recovers automatically when the provider loses subscription state</li>
* <li>Fault tolerance: retries with backoff handle transient network failures</li>
* <li>CP1 audit compliance: persisted payloads are immutable after persistence</li>
* <li>Idempotency: duplicate AMQP messages are detected via content hash (L1 + L2 cache)</li>
* <li>Event routing: FF-ICE messages are classified and dispatched to the correct Kafka topic</li>
* </ol>
*/
@QuarkusTest
@ConnectWireMock
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
class FficeConsumerIT {
private static final String VALID_FFICE_XML = """
<?xml version="1.0" encoding="UTF-8"?>
<ffice:FficeMessage xmlns:ffice="http://www.fixm.aero/app/ffice/1.1"
xmlns:fx="http://www.fixm.aero/flight/4.3"
xmlns:fb="http://www.fixm.aero/base/4.3">
<ffice:flight>
<fx:arrival>
<fx:destinationAerodrome>
<fb:locationIndicator>LFPG</fb:locationIndicator>
</fx:destinationAerodrome>
</fx:arrival>
<fx:departure>
<fx:departureAerodrome>
<fb:locationIndicator>LPPT</fb:locationIndicator>
</fx:departureAerodrome>
</fx:departure>
<fx:flightIdentification>
<fx:aircraftIdentification>TAP123</fx:aircraftIdentification>
<fx:gufi codeSpace="urn:uuid" creationTime="2026-05-06T07:00:00Z" namespaceDomain="FULLY_QUALIFIED_DOMAIN_NAME" namespaceIdentifier="swim-developer.github.io">f47ac10b-58cc-4372-a567-0e02b2c3d479</fx:gufi>
</fx:flightIdentification>
</ffice:flight>
<ffice:timestamp>2026-05-06T08:00:00.000Z</ffice:timestamp>
<ffice:type>FILED_FLIGHT_PLAN</ffice:type>
<ffice:uniqueMessageIdentifier codeSpace="urn:uuid">a1b2c3d4-e5f6-4890-abcd-ef1234567890</ffice:uniqueMessageIdentifier>
</ffice:FficeMessage>
""";
private static final String INVALID_XML = "<not-valid-fixm>broken</not-valid-fixm>";
WireMock wiremock;
@Inject
EventProcessingUseCase eventProcessor;
@Inject
MongoEventStore eventRepository;
@Inject
MongoDeadLetterStore dlqRepository;
@Inject
MongoSubscriptionStore subscriptionRepository;
@Inject
AbstractIdempotencyCache idempotencyCache;
@Inject
SubscriptionUseCase subscriptionService;
@Inject
@CacheName("processed-messages")
Cache l1Cache;
@BeforeEach
void cleanDatabase(TestInfo testInfo) {
System.out.printf("%n== > %s.%s%n", getClass().getSimpleName(), testInfo.getDisplayName());
eventRepository.deleteAll();
dlqRepository.deleteAll();
subscriptionRepository.deleteAllSubscriptions();
l1Cache.invalidateAll().await().indefinitely();
wiremock.removeMappings();
wiremock.resetAllScenarios();
wiremock.resetRequests();
}
// ─── Group 1: Subscription Lifecycle ─────────────────────────────────
/**
* Full subscription creation: REST API call, WireMock SM interaction, MongoDB persistence.
*
* <p><b>Framework capability:</b> The entire subscription lifecycle (POST to SM, receive PAUSED
* response, PUT to activate, persist locally) is handled by the framework. The developer
* writes zero subscription management code.</p>
*/
@Test
@Order(1)
void createSubscriptionEndToEnd() {
stubSubscriptionManagerCreate("sub-IT-001", "FFICE-client-sub-IT-001");
var body = Map.of("topic", "ffice.v1", "description", "Integration test");
given()
.contentType(ContentType.JSON)
.body(body)
.when()
.post("/api/v1/subscriptions")
.then()
.statusCode(201);
var persisted = subscriptionRepository.findBySubscriptionId("sub-IT-001");
assertThat(persisted).isPresent();
assertThat(persisted.get().getQueueName()).isEqualTo("FFICE-client-sub-IT-001");
wiremock.verifyThat(postRequestedFor(urlPathEqualTo("/swim/v1/subscriptions")));
wiremock.verifyThat(putRequestedFor(urlPathEqualTo("/swim/v1/subscriptions/sub-IT-001")));
}
/**
* Duplicate configHash detection: identical subscription request returns existing
* subscription WITHOUT calling the Subscription Manager again.
*
* <p><b>Framework capability:</b> Content-hash deduplication prevents redundant SM calls,
* protecting against operator mistakes and automated retries.</p>
*/
@Test
@Order(2)
void duplicateConfigHashReturnsExistingWithoutCallingSm() {
stubSubscriptionManagerCreate("sub-dup-cfg-1", "FFICE-client-sub-dup-cfg-1");
var body = Map.of("topic", "ffice.v1", "description", "First call");
given().contentType(ContentType.JSON).body(body)
.when().post("/api/v1/subscriptions")
.then().statusCode(201);
wiremock.resetRequests();
var secondResponse = given().contentType(ContentType.JSON).body(body)
.when().post("/api/v1/subscriptions")
.then().statusCode(201)
.extract().jsonPath();
assertThat(secondResponse.getString("subscriptionId")).isEqualTo("sub-dup-cfg-1");
wiremock.verifyThat(0, postRequestedFor(urlPathEqualTo("/swim/v1/subscriptions")));
}
/**
* List all subscriptions from MongoDB.
*
* <p><b>Framework capability:</b> REST API for subscription querying is fully generated
* by the archetype and backed by MongoDB via the framework persistence layer.</p>
*/
@Test
@Order(3)
void listSubscriptionsFromMongoDB() {
seedSubscription("sub-list-1", "ACTIVE");
seedSubscription("sub-list-2", "PAUSED");
var response = given()
.when().get("/api/v1/subscriptions")
.then().statusCode(200)
.extract().body().jsonPath();
assertThat(response.getList("$")).hasSize(2);
}
/**
* Pause subscription: consumer REST API forwards to SM via WireMock, persists new status.
*
* <p><b>SWIM compliance:</b> Pause/resume follows the SWIM Registry pattern
* (PUT /swim/v1/subscriptions/{id} with subscription_status field).</p>
*/
@Test
@Order(4)
void pauseSubscriptionViaApi() {
seedSubscription("sub-pause-1", "ACTIVE");
stubSubscriptionManagerUpdate("sub-pause-1", "PAUSED");
given()
.contentType(ContentType.JSON)
.body(Map.of("subscription_status", "PAUSED"))
.when().put("/api/v1/subscriptions/sub-pause-1")
.then().statusCode(200);
var updated = subscriptionRepository.findBySubscriptionId("sub-pause-1");
assertThat(updated).isPresent();
assertThat(updated.get().getSubscriptionStatus()).isEqualTo("PAUSED");
}
/**
* Resume subscription: transitions from PAUSED to ACTIVE.
*/
@Test
@Order(5)
void resumeSubscriptionViaApi() {
seedSubscription("sub-resume-1", "PAUSED");
stubSubscriptionManagerUpdate("sub-resume-1", "ACTIVE");
given()
.contentType(ContentType.JSON)
.body(Map.of("subscription_status", "ACTIVE"))
.when().put("/api/v1/subscriptions/sub-resume-1")
.then().statusCode(200);
var updated = subscriptionRepository.findBySubscriptionId("sub-resume-1");
assertThat(updated).isPresent();
assertThat(updated.get().getSubscriptionStatus()).isEqualTo("ACTIVE");
}
/**
* Delete subscription: consumer REST API forwards DELETE to SM, removes local record.
*/
@Test
@Order(6)
void deleteSubscriptionCleanup() {
seedSubscription("sub-del-1", "ACTIVE");
wiremock.register(delete(urlPathEqualTo("/swim/v1/subscriptions/sub-del-1"))
.willReturn(aResponse().withStatus(204)));
given()
.when().delete("/api/v1/subscriptions/sub-del-1")
.then().statusCode(204);
assertThat(subscriptionRepository.findBySubscriptionId("sub-del-1")).isEmpty();
wiremock.verifyThat(deleteRequestedFor(urlPathEqualTo("/swim/v1/subscriptions/sub-del-1")));
}
/**
* API contract validation: POST without required "topic" field is rejected.
*
* <p><b>Framework capability:</b> Input validation is enforced by the framework
* before any SM call is made.</p>
*/
@Test
@Order(7)
void createSubscriptionWithoutTopicRejects() {
given()
.contentType(ContentType.JSON)
.body(Map.of("description", "no topic"))
.when().post("/api/v1/subscriptions")
.then().statusCode(400);
}
// ─── Group 2: Event Processing Pipeline ──────────────────────────────
/**
* Full pipeline: valid FF-ICE XML is parsed, domain fields extracted, persisted to MongoDB.
*
* <p><b>What this proves:</b> The 10 domain classes (EventExtractor, JaxbUnmarshallerPool,
* XmlEnvelopeParser, EventDataValidator, EventFilterService, EventPersistenceService,
* ProcessorCallbacks, FficeSubscriptionRenewalStrategy, InboxMessageHandler,
* OutboxMessageHandler) integrate correctly with the framework orchestrator. GUFI,
* aircraft identification, departure/arrival aerodromes, and message type are all
* extracted from the FIXM 4.3 / FF-ICE 1.1 XML and persisted as first-class fields.</p>
*/
@Test
@Order(10)
void validFficePersistedWithFullMetadata() {
eventProcessor.processAndPersistSingleMessage(
"sub-pipe-1", "queue-1", "AMQP-MSG-001", VALID_FFICE_XML, 0);
List<Event> events = eventRepository.listAllDomain();
assertThat(events).hasSize(1);
Event event = events.get(0);
assertThat(event.getSubscriptionId()).isEqualTo("sub-pipe-1");
assertThat(event.getContentHash()).isNotEmpty();
assertThat(event.getDeliveryStatus()).isIn(OutboxDeliveryStatus.PENDING, OutboxDeliveryStatus.SENT);
assertThat(event.getRawPayload()).isEqualTo(VALID_FFICE_XML);
assertThat(event.getFficeMessageType()).isEqualTo("FILED_FLIGHT_PLAN");
assertThat(event.getGufi()).contains("f47ac10b");
assertThat(event.getAircraftIdentification()).isEqualTo("TAP123");
assertThat(event.getDepartureAerodrome()).isEqualTo("LPPT");
assertThat(event.getArrivalAerodrome()).isEqualTo("LFPG");
}
/**
* Invalid XML (not FIXM) is rejected and routed to the Dead Letter Queue.
*
* <p><b>CP1 compliance:</b> Non-conformant payloads must never reach business logic.
* The framework validates all XML against the FIXM schema before extraction.
* Rejected messages are preserved in the DLQ with error metadata for audit.</p>
*/
@Test
@Order(11)
void invalidXmlRoutedToDlq() {
try {
eventProcessor.processAndPersistSingleMessage(
"sub-dlq-1", "queue-1", "AMQP-INVALID-001", INVALID_XML, 0);
} catch (RuntimeException e) {
// expected
}
assertThat(eventRepository.listAll()).isEmpty();
List<DeadLetterMessage> dlqMessages = dlqRepository.listAllDomain();
assertThat(dlqMessages).hasSize(1);
assertThat(dlqMessages.get(0).getErrorType()).isEqualTo("VALIDATION_ERROR");
assertThat(dlqMessages.get(0).getRawPayload()).isEqualTo(INVALID_XML);
}
/**
* CP1 Audit Rule: once an event is persisted, audit-critical fields must remain immutable.
* The outbox scheduler may update deliveryStatus, but rawPayload, contentHash,
* subscriptionId, and messageId must never change.
*
* <p><b>Regulatory requirement:</b> EU Regulation 2021/116 (CP1) mandates a complete
* audit trail. This test proves the framework preserves payload integrity across
* lifecycle transitions (PENDING to SENT).</p>
*/
@Test
@Order(12)
void auditFieldsRemainImmutableAfterPersistence() {
eventProcessor.processAndPersistSingleMessage(
"sub-audit-1", "queue-1", "AMQP-AUDIT-001", VALID_FFICE_XML, 0);
Event original = eventRepository.listAllDomain().get(0);
String originalPayload = original.getRawPayload();
String originalHash = original.getContentHash();
String originalSubId = original.getSubscriptionId();
String originalMsgId = original.getMessageId();
original.setDeliveryStatus(OutboxDeliveryStatus.SENT);
eventRepository.update(original);
Event reloaded = eventRepository.listAllDomain().get(0);
assertThat(reloaded.getRawPayload()).isEqualTo(originalPayload);
assertThat(reloaded.getContentHash()).isEqualTo(originalHash);
assertThat(reloaded.getSubscriptionId()).isEqualTo(originalSubId);
assertThat(reloaded.getMessageId()).isEqualTo(originalMsgId);
assertThat(reloaded.getDeliveryStatus()).isEqualTo(OutboxDeliveryStatus.SENT);
}
/**
* Duplicate content (same SHA-256 hash) is silently discarded. Only 1 event persisted.
*
* <p><b>Framework capability:</b> At-least-once AMQP delivery means duplicates are
* inevitable. The framework uses a two-tier idempotency cache (L1 Caffeine + L2 MongoDB)
* to guarantee exactly-once processing without developer intervention.</p>
*/
@Test
@Order(13)
void duplicateContentDiscardedByIdempotency() {
eventProcessor.processAndPersistSingleMessage(
"sub-dup-1", "queue-1", "AMQP-DUP-001", VALID_FFICE_XML, 0);
eventProcessor.processAndPersistSingleMessage(
"sub-dup-1", "queue-1", "AMQP-DUP-002", VALID_FFICE_XML, 0);
assertThat(eventRepository.listAll()).hasSize(1);
}
/**
* Idempotency persists to MongoDB (L2 cache). After processing, the cache reports
* the content hash as already processed, proving deduplication survives application
* restarts and L1 cache eviction.
*
* <p><b>Production resilience:</b> When a pod restarts, the L1 Caffeine cache is lost.
* The L2 MongoDB cache ensures duplicates are still detected. This test proves
* the full chain: hash computation, L1 insert, L2 persistence, and lookup.</p>
*/
@Test
@Order(14)
void idempotencyPersistsToMongoDbL2Cache() {
String hash = HashUtil.sha256(VALID_FFICE_XML);
eventProcessor.processAndPersistSingleMessage(
"sub-idem-1", "queue-1", "AMQP-IDEM-001", VALID_FFICE_XML, 0);
assertThat(eventRepository.listAll()).hasSize(1);
assertThat(idempotencyCache.isAlreadyProcessed("sub-idem-1", hash)).isTrue();
}
// ─── Group 3: Event Routing ──────────────────────────────────────────
/**
* FILED_FLIGHT_PLAN is classified as FLIGHT_PLAN category for Kafka routing.
*
* <p><b>Framework capability:</b> The outbox extension (swim-outbox-kafka-ffice) uses
* FficeMessageClassifier to determine the target Kafka topic. Each FF-ICE message type
* maps to a specific category, enabling downstream systems to subscribe only to
* the events they care about.</p>
*/
@Test
@Order(15)
void routingClassifiesFiledFlightPlanCorrectly() {
eventProcessor.processAndPersistSingleMessage(
"sub-route-1", "queue-1", "AMQP-ROUTE-001", VALID_FFICE_XML, 0);
Event persisted = eventRepository.listAllDomain().get(0);
assertThat(FficeMessageClassifier.classify(persisted.getRawPayload()))
.isEqualTo(FficeEventCategory.FLIGHT_PLAN);
}
/**
* FLIGHT_DEPARTURE is classified as OPERATIONS category, distinct from FLIGHT_PLAN.
*
* <p><b>Domain significance:</b> Flight departures are operational events consumed by
* ATC systems in real time, while flight plans are planning artifacts. Correct routing
* ensures the right system gets the right data.</p>
*/
@Test
@Order(16)
void routingClassifiesFlightDepartureCorrectly() {
String departureXml = VALID_FFICE_XML.replace("FILED_FLIGHT_PLAN", "FLIGHT_DEPARTURE");
eventProcessor.processAndPersistSingleMessage(
"sub-route-dep", "queue-1", "AMQP-ROUTE-DEP", departureXml, 0);
Event persisted = eventRepository.listAllDomain().get(0);
assertThat(FficeMessageClassifier.classify(persisted.getRawPayload()))
.isEqualTo(FficeEventCategory.OPERATIONS);
}
// ─── Group 4: Subscription Guard ─────────────────────────────────────
/**
* Events arriving for a PAUSED subscription are silently discarded.
*
* <p><b>Framework capability:</b> The preProcess guard checks subscription status
* in MongoDB before any XML parsing. This is a business rule: paused means
* "stop processing", not just "stop receiving". No CPU wasted on JAXB parsing
* for a subscription the operator has intentionally paused.</p>
*/
@Test
@Order(17)
void pausedSubscriptionDiscardsEvents() {
seedSubscription("sub-paused-discard", "PAUSED");
var outcome = eventProcessor.processAndPersistSingleMessage(
"sub-paused-discard", "queue-1", "AMQP-PAUSED-001", VALID_FFICE_XML, 0);
assertThat(outcome).isEqualTo(ProcessingOutcome.SKIPPED);
assertThat(eventRepository.listAllDomain()).isEmpty();
assertThat(dlqRepository.listAllDomain()).isEmpty();
}
// ─── Group 5: Observability ──────────────────────────────────────────
/**
* Aggregate statistics reflect the real consumer state after mixed operations.
*
* <p><b>Operational value:</b> ANSPs need real-time visibility into consumer health.
* This endpoint powers dashboards that show processed events, DLQ depth, and
* active subscriptions at a glance.</p>
*/
@Test
@Order(20)
void statsReflectRealState() {
seedSubscription("sub-stats-1", "ACTIVE");
eventProcessor.processAndPersistSingleMessage(
"sub-stats-1", "queue-1", "AMQP-STATS-001", VALID_FFICE_XML, 0);
try {
eventProcessor.processAndPersistSingleMessage(
"sub-stats-1", "queue-1", "AMQP-STATS-002", INVALID_XML, 0);
} catch (RuntimeException e) {
// expected
}
var response = given()
.when().get("/swim/v1/operational/stats")
.then().statusCode(200)
.extract().body().jsonPath();
assertThat(response.getLong("totalEvents")).isEqualTo(1);
assertThat(response.getLong("totalDlq")).isEqualTo(1);
assertThat(response.getInt("activeSubscriptions")).isEqualTo(1);
}
/**
* DLQ query returns rejected messages with pagination.
*
* <p><b>Audit capability:</b> Operators and regulators can inspect every rejected
* message, including the original raw payload and the validation error reason.</p>
*/
@Test
@Order(21)
void queryDlqAfterRejection() {
try {
eventProcessor.processAndPersistSingleMessage(
"sub-dlq-q", "queue-1", "AMQP-DLQ-Q-001", INVALID_XML, 0);
} catch (RuntimeException e) {
// expected
}
var response = given()
.when().get("/swim/v1/operational/dlq?page=0&size=10")
.then().statusCode(200)
.extract().body().jsonPath();
assertThat(response.getList("content")).hasSize(1);
}
/**
* Liveness probe returns UP when the application is running.
*
* <p><b>Kubernetes integration:</b> OpenShift uses this probe to determine if the
* pod needs restarting. The framework registers health checks for MongoDB,
* AMQP connections, and heartbeat monitoring automatically.</p>
*/
@Test
@Order(22)
void livenessProbeUp() {
given()
.when().get("/q/health/live")
.then().statusCode(200);
}
// ─── Group 6: Self-Healing ───────────────────────────────────────────
/**
* Provider returns 404 during resume: framework deletes the stale local subscription
* and triggers a full re-subscription cycle (POST, PAUSED, ACTIVE).
*
* <p><b>This is the gold standard for the SFG.</b> In production, SWIM providers
* (EUROCONTROL, Austrocontrol, LFV) may lose subscription state after upgrades,
* disaster recovery, or database migrations. Without self-healing, an ANSP would
* need manual intervention to restore data flow. The framework detects the 404,
* cleans up the orphan, and recreates the subscription automatically.</p>
*/
@Test
@Order(40)
void automaticResubscriptionOnProviderStateLoss() {
seedSubscription("sub-lost-1", "ACTIVE");
wiremock.register(put(urlPathEqualTo("/swim/v1/subscriptions/sub-lost-1"))
.willReturn(aResponse().withStatus(404)));
stubSubscriptionManagerCreate("sub-recovered-1", "FFICE-client-sub-recovered-1");
try {
subscriptionService.resumeSubscription("sub-lost-1");
} catch (Exception e) {
// expected: provider returns 404 for lost subscription
}
assertThat(subscriptionRepository.findBySubscriptionId("sub-lost-1"))
.as("Old subscription must be deleted after provider 404")
.isEmpty();
assertThat(subscriptionRepository.findBySubscriptionId("sub-recovered-1"))
.as("New subscription must be created after provider state loss recovery")
.isPresent()
.get()
.satisfies(s -> assertThat(s.getSubscriptionStatus()).isEqualTo("ACTIVE"));
wiremock.verifyThat(postRequestedFor(urlPathEqualTo("/swim/v1/subscriptions")));
}
// ── Helpers ──────────────────────────────────────────────────────────
private void stubSubscriptionManagerCreate(String subscriptionId, String queueName) {
String responseJson = """
{
"subscriptionId": "%s",
"subscriptionStatus": "PAUSED",
"queueName": "%s",
"topic": "ffice.v1",
"description": "Integration test"
}
""".formatted(subscriptionId, queueName);
wiremock.register(post(urlPathEqualTo("/swim/v1/subscriptions"))
.willReturn(aResponse()
.withStatus(201)
.withHeader("Content-Type", "application/json")
.withBody(responseJson)));
stubSubscriptionManagerUpdate(subscriptionId, "ACTIVE");
}
private void stubSubscriptionManagerUpdate(String subscriptionId, String newStatus) {
String responseJson = """
{
"subscriptionId": "%s",
"subscriptionStatus": "%s",
"queueName": "FFICE-client-%s",
"topic": "ffice.v1"
}
""".formatted(subscriptionId, newStatus, subscriptionId);
wiremock.register(put(urlPathEqualTo("/swim/v1/subscriptions/" + subscriptionId))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody(responseJson)));
}
private void seedSubscription(String subscriptionId, String status) {
Subscription sub = new Subscription();
sub.setSubscriptionId(subscriptionId);
sub.setQueueName("FFICE-client-" + subscriptionId);
sub.setSubscriptionStatus(status);
sub.setTopic("ffice.v1");
sub.setDescription("Seeded for test");
sub.setType(com.github.swim_developer.framework.domain.model.SubscriptionType.DECLARED.name());
sub.setConfigHash("test-hash-" + subscriptionId);
sub.setProviderId("test-provider");
subscriptionRepository.persistSubscription(sub);
}
}
Step 7.5: Run tests
From the swim-ffice-consumer directory:
# Unit tests only
./mvnw test
# Unit + integration tests
./mvnw verify -DskipITs=false
# Integration tests only (skip unit tests)
./mvnw verify -DskipITs=false -Dsurefire.skip=true
Expected output: 17 unit tests + 19 integration tests = 36 tests, 0 failures.
The integration tests cover 6 critical areas:
- Subscription lifecycle (7 tests): create, list, pause, resume, delete, duplicate detection, input validation
- Event pipeline (5 tests): full metadata extraction, DLQ routing, CP1 audit immutability, L1+L2 idempotency
- Event routing (2 tests): FF-ICE message classification (FILED_FLIGHT_PLAN, FLIGHT_DEPARTURE)
- Subscription guard (1 test): PAUSED subscriptions discard events before JAXB parsing
- Observability (3 tests): aggregate stats, DLQ query with pagination, Kubernetes liveness probe
- Self-healing (1 test): automatic re-subscription when provider loses state (404 recovery)