Part 2: Provider Service (swim-ffice-provider)
The provider is a Quarkus service that:
- Ingests FF-ICE messages from a Kafka topic (
ffice-events-all-topic) - Validates each message against the FIXM/FF-ICE XSD schemas via JAXB
- Persists valid events in PostgreSQL with
RECEIVEDstatus - Delivers events asynchronously to subscribed consumers via AMQP (ActiveMQ Artemis)
- Exposes the SWIM Subscription Manager REST API (
/swim/v1/subscriptions,/swim/v1/topics)
2.1 Generate the provider project
Run the archetype from your working directory:
mvn archetype:generate \
-DarchetypeGroupId=com.github.swim-developer \
-DarchetypeArtifactId=swim-provider-archetype \
-DarchetypeVersion=1.0.0-SNAPSHOT \
-DgroupId=com.github.swim-developer \
-DartifactId=swim-ffice-provider \
-Dversion=1.0.0-SNAPSHOT \
-Dpackage=com.github.swim_developer.ffice.provider \
-DserviceName=ffice \
-DserviceDisplayName="FF-ICE" \
-DservicePrefix=Ffice \
-DdataModel=FIXM \
-DtablePrefix=ffice \
-DqueuePrefix=FFICE \
-DtopicName=FficeService \
-DmodelArtifactId=swim-fixm-ffice-model \
-DinteractiveMode=false
cd swim-ffice-provider
chmod +x mvnw # Linux / macOS only
2.2 Add the data model dependency
In pom.xml, add the swim-fixm-ffice-model dependency after the swim-framework-provider dependency:
<dependency>
<groupId>com.github.swim-developer</groupId>
<artifactId>swim-fixm-ffice-model</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
2.3 Define the filterable event model
The FilterableEvent record carries the metadata used by the delivery engine to match events against subscription filters. Replace the generated placeholder in src/main/java/.../domain/model/FilterableEvent.java:
@RegisterForReflection
public record FilterableEvent(
String messageId,
String gufi,
String messageType,
String payload
) {
}
2.3.1 Customize the stored event domain model
The archetype generates StoredEvent.java and EventJpaEntity.java without FF-ICE-specific fields. You must add gufi and messageType to both before proceeding — these fields are required by IngressMessageHandler and EventDeliveryUseCase.
In domain/model/StoredEvent.java, add two fields to the existing class:
@Data @Builder @NoArgsConstructor @AllArgsConstructor
public class StoredEvent implements SwimProviderEvent {
private String eventId;
private String gufi; // <-- add
private String messageType; // <-- add
@Builder.Default
private EventStatus status = EventStatus.RECEIVED;
// ... remaining fields unchanged ...
}
In infrastructure/out/persistence/entity/EventJpaEntity.java, add the matching columns:
@Entity
@Table(name = "ffice_events", indexes = {
@Index(name = "idx_received_at", columnList = "receivedAt"),
@Index(name = "idx_status", columnList = "status")
})
@Data @Builder @NoArgsConstructor @AllArgsConstructor
public class EventJpaEntity {
@Id @Column(length = 100)
private String eventId;
@Column(length = 255)
private String gufi; // <-- add
@Column(length = 50)
private String messageType; // <-- add
// ... remaining fields unchanged ...
}
The persistence mapper (generated by the archetype) must also copy these fields. Verify that ProviderPersistenceMapper maps gufi and messageType in both toJpa() and toDomain().
2.4 Implement the JAXB unmarshaller pool
Create src/main/java/.../infrastructure/out/xml/JaxbUnmarshallerPool.java as a thin adapter that delegates to the model's FficeUnmarshallerPool:
@Slf4j
@ApplicationScoped
public class JaxbUnmarshallerPool implements SwimXmlUnmarshallerPort<FficeMessageType> {
private FficeUnmarshallerPool pool;
@PostConstruct
void initialize() {
this.pool = new FficeUnmarshallerPool();
log.info("FF-ICE JAXB unmarshaller pool initialized from swim-fixm-ffice-model");
}
@Override
public FficeMessageType unmarshalAndValidate(String xml) throws XmlValidationException {
try {
Object result = pool.unmarshalAndValidate(xml);
if (result instanceof FficeMessageType message) {
return message;
}
throw new XmlValidationException("Unexpected root type: "
+ (result != null ? result.getClass().getName() : "null"));
} catch (FficeUnmarshallerPool.FficeUnmarshalException e) {
throw new XmlValidationException(e.getMessage(), e);
}
}
}
2.5 Implement the event extractor
Create src/main/java/.../infrastructure/out/xml/EventExtractor.java. This class reads a fully-parsed FficeMessageType and returns a FilterableEvent with the identifiers needed for routing and deduplication:
@Slf4j
@ApplicationScoped
public class EventExtractor implements SwimEventExtractor<FilterableEvent, FficeMessageType> {
private static final String UNKNOWN = "unknown";
@Override
public List<Optional<FilterableEvent>> extract(FficeMessageType message) {
if (message == null) {
return List.of(Optional.empty());
}
try {
String messageId = extractMessageId(message);
String gufi = extractGufi(message);
String messageType = extractMessageType(message);
return List.of(Optional.of(new FilterableEvent(messageId, gufi, messageType, null)));
} catch (RuntimeException e) {
log.error("Failed to extract FF-ICE event metadata", e);
return List.of(Optional.empty());
}
}
private String extractMessageId(FficeMessageType message) {
UniversallyUniqueIdentifierType uid = message.getUniqueMessageIdentifier();
return (uid != null && uid.getValue() != null) ? uid.getValue() : UNKNOWN;
}
private String extractGufi(FficeMessageType message) {
FlightType flight = message.getFlight();
if (flight == null) return UNKNOWN;
var flightIdElement = flight.getFlightIdentification();
if (flightIdElement == null || flightIdElement.getValue() == null) return UNKNOWN;
var gufiElement = flightIdElement.getValue().getGufi();
if (gufiElement == null || gufiElement.getValue() == null) return UNKNOWN;
String v = gufiElement.getValue().getValue();
return (v != null && !v.isBlank()) ? v : UNKNOWN;
}
private String extractMessageType(FficeMessageType message) {
MessageTypeType type = message.getType();
return (type != null) ? type.value() : UNKNOWN;
}
}
2.6 Implement the ingress message handler
The ingress handler is called by the Kafka consumer for each incoming message. It orchestrates validation, extraction, persistence, and async delivery dispatch. Replace the generated placeholder in src/main/java/.../infrastructure/in/amqp/IngressMessageHandler.java.
The handler must implement the SwimIngressHandler interface and its processEvent(String xmlMessage) method. The full implementation follows this sequence:
- Call
jaxbPool.unmarshalAndValidate(xmlMessage)— rejects invalid XML - Call
eventExtractor.extract(parsed)— extractsmessageId,gufi,messageType - Persist the event with
EventStatus.RECEIVEDviaProviderEventStore - Register an
AfterCommitEventDispatcherto dispatch the event to the outbox after the transaction commits
Apply the following Fault Tolerance annotations to processEvent:
@Transactional
@Retry(maxRetries = 2, delay = 500)
@Timeout(value = 10, unit = ChronoUnit.SECONDS)
@CircuitBreaker(requestVolumeThreshold = 10, failureRatio = 0.5, delay = 30000)
@Bulkhead(value = 100)
@WithSpan("ffice.provider.process")
2.7 Implement the subscription store query methods
Open src/main/java/.../infrastructure/out/persistence/JpaSubscriptionStore.java and implement the three abstract methods inherited from SwimSubscriptionRepository:
@Override
public List<Subscription> findByUserId(String userId) {
return list("userId", userId).stream().map(mapper::toDomain).toList();
}
@Override
public boolean existsActiveOrPausedByQueue(String queue) {
return count("queue = ?1 and (status = ?2 or status = ?3)", queue,
SubscriptionStatus.ACTIVE, SubscriptionStatus.PAUSED) > 0;
}
@Override
public Optional<Subscription> findActiveOrPausedByQueueAndUser(String queue, String userId) {
return find("queue = ?1 and userId = ?2 and (status = ?3 or status = ?4)",
queue, userId, SubscriptionStatus.ACTIVE, SubscriptionStatus.PAUSED)
.firstResultOptional().map(mapper::toDomain);
}
Also implement the toFilterableModel method in EventDeliveryUseCase.java:
@Override
protected FilterableEvent toFilterableModel(StoredEvent entity) {
return new FilterableEvent(entity.getEventId(), entity.getGufi(),
entity.getMessageType(), entity.getXmlMessage());
}
2.7.1 Customize the subscription domain model
The generated Subscription.java has a placeholder toFilter() that accepts every event. Replace it with a real filter on messageType:
public class Subscription implements SwimSubscription<FilterableEvent> {
// ... existing fields ...
private List<String> messageType = new ArrayList<>(); // <-- add
@Override
public Predicate<FilterableEvent> toFilter() {
return event -> {
if (event == null) return false;
if (messageType == null || messageType.isEmpty()) return true; // no filter = accept all
return messageType.contains(event.messageType());
};
}
}
Add messageType to SubscriptionCommand.java and SubscriptionResult.java as well:
// SubscriptionCommand.java
public record SubscriptionCommand(
String userId,
List<String> messageType // <-- add
) {}
// SubscriptionResult.java
public record SubscriptionResult(
String subscriptionId,
String queueName,
String subscriptionStatus,
String heartbeatQueue,
List<String> messageType // <-- add
) {}
2.7.2 Customize the subscription JPA entity and mappers
Add the messageType column to SubscriptionJpaEntity.java:
@Column(columnDefinition = "TEXT")
@Convert(converter = StringListConverter.class)
@Builder.Default
private List<String> messageType = new ArrayList<>();
Update SubscriptionRequest.java and SubscriptionResponse.java (REST DTOs):
// SubscriptionRequest.java — add field
@JsonProperty("message_type")
@JsonAlias("messageType")
private List<String> messageType;
// SubscriptionResponse.java — add field
@JsonProperty("message_type")
private List<String> messageType;
Update ProviderSubscriptionMappingAdapter.java to copy the field in both directions (command → entity, entity → result), and ProviderSubscriptionMapper.java to copy it between REST DTOs and domain commands/results.
After these changes, a subscription created with {"message_type":["FILED_FLIGHT_PLAN"]} will only receive events whose messageType equals FILED_FLIGHT_PLAN. A subscription without message_type receives all events.
2.8 Configure the application
Create src/main/resources/application-dev.properties for the local development profile:
# Database
quarkus.hibernate-orm.schema-management.strategy=drop-and-create
quarkus.datasource.username=postgres
quarkus.datasource.password=postgres
quarkus.datasource.jdbc.url=jdbc:postgresql://localhost:5432/swim-ffice
# Messaging
amqp-host=localhost
amqp-port=5671
amqp-username=admin
amqp-password=admin
kafka.bootstrap.servers=localhost:9092
artemis.broker.name=amq-broker
# AMQP TLS
mp.messaging.outgoing.ffice-amqp-out.use-ssl=true
mp.messaging.outgoing.ffice-amqp-out.sni-server-name=${amqp-host}
mp.messaging.outgoing.ffice-amqp-out.tls-configuration-name=amqp-client
quarkus.tls.amqp-client.trust-store.pem.certs=certs/ca.crt
quarkus.tls.amqp-client.key-store.pem.0.cert=certs/client.crt
quarkus.tls.amqp-client.key-store.pem.0.key=certs/client.key
# HTTPS
quarkus.http.ssl.certificate.files=certs/tls.crt
quarkus.http.ssl.certificate.key-files=certs/tls.key
quarkus.http.ssl.certificate.trust-store-file=certs/ca.crt
quarkus.http.ssl.certificate.trust-store-file-type=PEM
quarkus.http.ssl.client-auth=request
quarkus.http.insecure-requests=enabled
# OIDC
quarkus.oidc.enabled=true
quarkus.oidc.auth-server-url=https://keycloak.swim.lab:8543/realms/swim
quarkus.oidc.credentials.secret=BtYGyfj6R1YvoKhEsDfjg0aMxSc8VyJ5
quarkus.tls.trust-all=true
quarkus.security.auth.enabled-in-dev-mode=true
# DevServices (DISABLED - use compose.yml)
quarkus.devservices.enabled=false
# OpenTelemetry (DISABLED in dev)
quarkus.otel.enabled=false
quarkus.otel.sdk.disabled=true
# Heartbeat
swim.heartbeat.interval=10s
swim.heartbeat.provider-id=ffice-provider-dev
# Subscription Expiry
swim.subscription.expiry.check-interval=30s
swim.subscription.expiry.purge-delay=5m
swim.subscription.expiry.default-ttl=1h
# OpenAPI
mp.openapi.servers=${OPENAPI_SERVERS:https://localhost:8443,http://localhost:8080}
Create src/main/resources/application-test.properties for the automated test profile:
# Database
quarkus.hibernate-orm.schema-management.strategy=drop-and-create
# OIDC (dummy URL - tests use permit policy)
quarkus.oidc.auth-server-url=http://localhost:0/realms/test
# SSL/TLS (DISABLED)
quarkus.http.ssl.certificate.files=
quarkus.http.ssl.certificate.key-files=
quarkus.http.ssl.certificate.trust-store-file=
quarkus.http.ssl.client-auth=none
quarkus.http.insecure-requests=enabled
quarkus.http.test-ssl-port=-1
# Security (permit all for tests)
quarkus.http.auth.permission.swim-api.policy=permit
# Messaging (DISABLED)
mp.messaging.incoming.ffice-events.enabled=false
mp.messaging.outgoing.ffice-amqp-out.enabled=false
# OpenTelemetry (DISABLED)
quarkus.otel.sdk.disabled=true
# Schedulers / Fault Tolerance (DISABLED)
quarkus.scheduler.enabled=false
quarkus.fault-tolerance.enabled=false
# SWIM Domain
swim.topics=FficeService,TestTopic
swim.heartbeat.enabled=false
swim.heartbeat.provider-id=test-provider
swim.heartbeat.interval=999d
swim.kubernetes.namespace=test
swim.subscription.expiry.check-interval=999d
swim.subscription.expiry.purge-interval=999d
# Messaging overrides
amqp-host=localhost
artemis.broker.name=test-broker
2.9 Build
./mvnw clean package -DskipTests # Linux / macOS
mvnw.cmd clean package -DskipTests # Windows
The build must succeed with BUILD SUCCESS before proceeding.