Consumer - Code Flow
This page explains the runtime sequence implemented by the consumer service. Every step is drawn directly from the consumer's services and modules.
Message Intake and Validation
Incoming messages from the raw_content_aa Kafka topic are received by the application controller and immediately handed off to the consumer service's process() method. The first thing process() does is run a Zod schema validation against the raw message payload. It expects three fields: id (the source identifier), baslik (headline), and ozet (abstract). If any field is missing or fails type validation, a warning is logged and the method returns early. The message is considered consumed from Kafka's perspective and will not be retried.
Deduplication
Before any expensive work is done, the service checks whether this source ID has already been processed. A Redis key is computed as the SHA-256 hash of the source ID concatenated with the source enum value (AA). The service calls EXISTS on this key — if the key is present, processing stops immediately without touching the database or the model.
If Redis returns a miss, the service falls back to a Postgres lookup. It queries the contents table for the given source_id and checks whether model_kategori is already populated. If it is, the Redis key is backfilled with a 3-hour TTL and the message is skipped. This means a warm Redis cache is rebuilt gradually from the database after a Redis restart, and duplicate inference is still prevented.
gRPC Inference
The consumer service calls ModelService.Predict on the gRPC model service with the source ID, headline (baslik), and abstract (ozet) exactly as they were received in the Kafka message. The producer is responsible for pre-cleaning the text before emitting — no additional normalization happens in the consumer. The gRPC client is initialized during the module's OnModuleInit lifecycle hook using ClientGrpc.getService(). The call returns an Observable which is resolved via firstValueFrom(), making it a one-shot async operation — the consumer waits for a single response and does not stream.
The response carries three fields: predicted_category (an integer index), confidence (a float representing the top score), and all_confidences (a map of category name strings to their individual scores).
Category Resolution
The integer predicted_category from the response is mapped to the internal Kategori enum using a MODEL_INDEX_TO_KATEGORI lookup table. If the model returns a null or missing predicted_category, the service falls back to computing the argmax over the all_confidences map — it finds the category name with the highest score, maps that name to an index via CATEGORY_NAME_TO_INDEX, and then resolves the enum from there. This means classification always produces a result as long as all_confidences is populated.
Persistence
Three sequential writes happen after a successful inference:
Content row upsert: the service checks whether a contents row already exists for the source ID. If it does but lacks a category, it is updated with model_kategori and processed_at. If no row exists, a new minimal record is created — only source_id, source, model_kategori, and processed_at are written. Fields like headline, abstract, and body are not written by the consumer; those are the producer's responsibility.
Classification result insert: a new row is always inserted into content_results, storing the predicted_category index, the top confidence score, and the full all_confidences map as a JSONB column. This record is the authoritative per-article classification artifact.
Redis cache write: after the database writes succeed, the consumer sets the deduplication Redis key to the resolved category value with a SETEX TTL of 3 hours (10800 seconds). This is the key checked at the start of the dedup flow.
Downstream Emission
With the Redis key set, the consumer emits a processed_content_aa Kafka message containing id, model_kategori, and confidence. This event is what downstream services — such as the API or analytics consumers — use to learn that an article has been classified. Emission happens only on the success path; skipped and failed messages produce no output event.
Analytics Recording
A consumer_analytics row is recorded for two classes of messages: validation failures (Zod parse errors) and any message that passes deduplication and reaches the gRPC inference stage — regardless of whether inference succeeded or failed. Messages that are skipped by the deduplication check (both Redis hits and Postgres hits) exit early and produce no analytics row.
The row records the source ID, the total processing time in milliseconds measured from the start of process(), a boolean success flag, and an error_type string when applicable. Errors from gRPC that contain the strings UNAVAILABLE, UNKNOWN, or grpc in their message are classified as "grpc"; all others are classified as "internal".