06_SPR/03 -- Pipeline de embeddings de archivos.md

Pipeline de embeddings de archivos

La segunda línea del FileProcessorfileEmbeddingPort.embed(...)— es la que abre dwall-module-embeddings-files. Detrás de esa única invocación se desencadena todo el pipeline que convierte un byte[] en N vectores de 768 dimensiones persistidos en PostgreSQL: extracción de texto, descripción de imágenes con Gemini Vision, troceado del contenido, llamada al modelo de embeddings y escritura en pgvector. Este capítulo entra a esa caja.

La orquestación: FileEmbeddingService.process

El método process es la pieza central del módulo. Compone los seis puertos en una secuencia que cubre las tres fases del pipeline (extracción → preparación → embedding) y que mantiene la tabla mirror sincronizada con el progreso:

public void process(final Long fileId, final byte[] fileBytes, final String mimeType) {
    final FileId id = FileId.of(fileId);
    try {
        fileEmbeddingStatusRepository.markProcessing(id);

        final FileType fileType = FileType.fromMimeType(mimeType);
        final FileContent content = fileContentExtractor.extract(fileBytes, 
							        fileType);
        final List<String> pages = buildEnrichedPageTexts(content);
        final List<FileChunk> chunks = fileChunker.chunk(pages);
        final FileEmbedding embedding = createEmbedding(fileId, chunks);

        fileEmbeddingRepository.save(embedding);
        fileEmbeddingStatusRepository.markReady(id);
    } catch (final Exception e) {
        fileEmbeddingStatusRepository.markFailed(id, e.getMessage());
        log.error("[embeddings-files] failed processing file {}: {}", fileId, e.getMessage());
    }
}

El primer markProcessing inicia la transición de estado que el patrón choreography del capítulo anterior dejaba señalaba. A partir de ahí el flujo es lineal. FileType.fromMimeType mapea el MIME a uno de los tipos soportados, y el FileContentExtractor —que internamente es un FileContentExtractorDispatcher que enruta al extractor concreto— devuelve un FileContent estructurado por páginas, donde cada PageContent lleva su texto y la lista de imágenes embebidas que aparezcan en ella. La separación por páginas no es casual: permite que cada chunk recuerde de qué página viene, lo que más tarde habilita citar pasajes con referencia exacta en el RAG.

buildEnrichedPageTexts es donde el pipeline hace algo no obvio. En lugar de chunkear directamente el texto extraído, enriquece cada página con la descripción de las imágenes que contiene antes de trocear:

private String enrichPageWithImageDescriptions(final PageContent page) {
    if (page.images().isEmpty()) {
        return page.text();
    }
    final List<String> descriptions = 
					   imageDescriptionGenerator.describe(page.images());
    final StringBuilder merged = new StringBuilder(page.text());
    for (final String description : descriptions) {
        merged.append("\n[Imagen: ").append(description).append("]");
    }
    return merged.toString();
}

La consecuencia práctica es que un PDF con un diagrama del compresor IGE12 se vuelve buscable también por el contenido visual del diagrama, no solo por el texto que lo rodea. Si el operador pregunta por "esquema del compresor IGE12", el chunk que mezcla la descripción Vision con el texto adyacente va a quedar geométricamente cerca de la consulta aunque la palabra "esquema" no aparezca literalmente en el PDF original. Hacer esto antes de chunkear es deliberado: si lo hiciéramos después, las descripciones quedarían descontextualizadas del párrafo que las rodea.

El FileChunker reparte cada página enriquecida en fragmentos de hasta 1000 caracteres con 150 de solape, partiendo recursivamente por separadores cada vez más finos (\n\n, \n, . , ) hasta caber en el límite. El solape garantiza que ningún concepto existente entre dos chunks se pierda en la búsqueda.

createEmbedding es la última pieza del bloque: pasa los textos de todos los chunks al FileEmbeddingGenerator —que internamente es GeminiFileEmbeddingClient— en una sola llamada batch, asigna a cada chunk el vector que le corresponde, y construye el agregado FileEmbedding. El batch es importante: pedirle a Gemini 30 vectores en una llamada cuesta lo mismo en latencia que pedirle uno, y mucho menos que pedirle 30 por separado.

El save final delega en FileEmbeddingRepository, que dentro hace un delete previo de los chunks anteriores asociados al fileId antes del batch insert — un replace atómico que asegura que el archivo nunca queda con chunks de dos versiones distintas a la vez. Y el markReady cierra la transición de estado en la tabla mirror.

Si cualquier paso lanza una excepción, el catch la captura, marca el estado como FAILED con el mensaje de error guardado en la columna error_message, y deja el log para inspección. El archivo queda en GCS y registrado en module_files_file, pero sin chunks indexados — la siguiente reintento puede reanudar el procesado sin tocar lo demás.

Esquema de datos: las tres tablas

De esta forma, manejamos tres tablas, repartidas en dos bounded contexts:

module_files_file es la tabla del agregado FileResource que ya vimos en el capítulo anterior. module_embeddings_files_file es la tabla mirror que el módulo de embeddings mantiene para llevar su propio ciclo de estados (PENDING, PROCESSING, COMPLETED, FAILED) sin acoplarse al estado de subida del archivo. Y module_embeddings_file_chunk es donde realmente vive el conocimiento útil: una fila por chunk, con su posición dentro del archivo, las páginas que abarca, el texto extraído y el vector de 768 dimensiones que lo representa en el espacio semántico.

Cada módulo gestiona su propio esquema y la integridad referencial se sostiene a nivel de aplicación, vía los listeners y los puertos de creación. Quitar mañana el módulo de embeddings del classpath dejaría las dos tablas inferiores huérfanas pero no rompería ninguna constraint del módulo de archivos.

Diagrama de secuencia del flujo completo

Con el caso de uso ya descrito, el flujo de extremo a extremo —desde el POST /files del cliente hasta el último vector escrito en pgvector— se ve así:

Lo más importante del diagrama está al principio: el cliente recibe el FileId antes de que arranque ni la subida a GCS ni el pipeline de embeddings. Todo el trabajo pesado vive en un hilo de fondo —Spring @Async en FileProcessor, y de nuevo @Async en el FileEmbeddingAdapter que implementa el puerto— por lo que la UX no paga el coste de esperar a que Gemini termine.

El siguiente capítulo aprovecha lo que este pipeline deja indexado y entra en cómo se consulta.