Integrating Vector Data with Langchain4j PostgreSQL and Liquibase in Spring Boot

This article dives into the integration of Langchain4j, PostgreSQL, and Liquibase within a Spring Boot environment, tailored specifically for Java developers. Langchain4j, a framework designed for efficient vector data management, when combined with PostgreSQL’s robust database capabilities and Liquibase’s schema version control, creates a powerful ecosystem. This guide will walk you through setting up these integrations, detailing the configurations and code necessary to manage complex vector data. By understanding and implementing these integrations, developers can significantly enhance the performance and scalability of their data-driven applications, paving the way for advanced analytics and machine learning operations.

Setting Up the Project Environment

Initiating a Spring Boot project with Gradle is the first step in our journey to integrate Langchain4j, PostgreSQL, and Liquibase. To start, create a new Spring Boot project, either through Spring Initializr or manually setting up a Gradle project. Ensure your build.gradle file includes the necessary dependencies for Spring Boot and any other tools you plan to use.

Next, incorporate the Liquibase dependency into your Gradle build script. Adding implementation 'org.liquibase:liquibase-core' to your dependencies block in build.gradle integrates Liquibase into your project. This setup is crucial for managing database migrations and schema changes, allowing you to version control your database alongside your application code seamlessly.

This foundational setup lays the groundwork for a robust application, capable of handling complex data management tasks with ease and efficiency.

Incorporating the provided Liquibase YAML configurations into the article gives us a complete picture of the database schema management process. Here’s a revised section that integrates these details:

Liquibase Configuration Explained

Liquibase, integrated into a Spring Boot environment, excels in managing and applying database schema changes. The YAML configurations provided play a critical role in this process:

  1. Master Changelog Configuration (db.changelog-master.yaml): This file acts as the root for all schema change logs. It includes references to other YAML files that define specific database changes. For example:
1
2
3
4
5
6
7
8
9
databaseChangeLog:
- include:
file: history/001-enable-extension.yaml
relativeToChangelogFile: true
description: enable vector extension
- include:
file: history/002-schema-init.yaml
relativeToChangelogFile: true
description: schema init

It references two critical changes: enabling the vector extension and initializing the schema.

  1. Enabling Vector Extension (001-enable-extension.yaml): This file contains a changeSet that creates a vector extension if it doesn’t exist. This is essential for working with vector data types in PostgreSQL.
1
2
3
4
5
6
7
databaseChangeLog:
- changeSet:
id: 1700794119845-1
author: samzhu (generated)
changes:
- sql:
sql: CREATE extension IF NOT EXISTS vector;
  1. Schema Initialization (002-schema-init.yaml): This file outlines the initial schema for the dataset_embedding table, detailing columns for storing vector data, text, metadata, and other relevant information.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
databaseChangeLog:
- changeSet:
id: 1700794119845-2
author: samzhu (generated)
changes:
- createTable:
columns:
- column:
constraints:
nullable: false
primaryKey: true
primaryKeyName: pk_dataset_embedding_id
name: id
remarks: Unique identifier for each record, using UUID
type: VARCHAR(36)
- column:
constraints:
nullable: false
name: dataset_id
remarks: Unique identifier for the dataset
type: VARCHAR(36)
- column:
constraints:
nullable: false
name: file_name
remarks: Name of the file in the dataset
type: VARCHAR(60)
- column:
constraints:
nullable: false
name: embedding_vector
remarks: Embedding vector of the file, stored using the vector type
type: VECTOR(1536)
- column:
constraints:
nullable: false
name: document_text
remarks: Document text corresponding to the file
type: TEXT
- column:
name: metadata
remarks: Stores additional metadata related to the file, using JSONB format
type: JSONB
- column:
constraints:
nullable: false
name: version_count
remarks: Count of file versions
type: INTEGER
- column:
constraints:
nullable: false
name: created_by
remarks: Username of the user who created the record
type: VARCHAR(60)
- column:
constraints:
nullable: false
defaultValueComputed: CURRENT_TIMESTAMP
name: creation_timestamp
remarks: Timestamp of record creation, using TIMESTAMPTZ type
type: TIMESTAMP WITH TIME ZONE
- column:
constraints:
nullable: false
name: modified_by
remarks: Username of the user who last modified the record
type: VARCHAR(60)
- column:
constraints:
nullable: false
defaultValueComputed: CURRENT_TIMESTAMP
name: modification_timestamp
remarks: Timestamp of the last modification of the record, using TIMESTAMPTZ type
type: TIMESTAMP WITH TIME ZONE
remarks: Used to store embedding vectors of the dataset, including embedding vectors of files, document text, and related metadata
tableName: dataset_embedding

These configurations, combined with the application.yaml setup for database connectivity, complete the Liquibase setup in your Spring Boot project. They ensure that your database schema is managed efficiently, maintaining consistency and reliability in your application’s data layer.

Langchain4j and Its Role in Vector Data Management

Langchain4j is a Java-based framework that significantly enhances Java applications by integrating advanced AI and Large Language Model (LLM) capabilities. It offers a flexible architecture that allows easy integration and swapping of various components like LLM providers and embedding store providers. Key features include data ingestion, autonomous agents, prompt templates, context-aware memory, structured outputs, and AI services. This makes Langchain4j ideal for managing vector data and complex AI functionalities within Java environments, especially in applications that require intelligent data processing and interaction. For more detailed insights, Langchain4j’s GitHub page is a valuable resource. Explore Langchain4j on GitHub.

Deep Dive into the Code: SpringJdbcPgVectorEmbeddingStore Class

In the SpringJdbcPgVectorEmbeddingStore class, we explore functionalities essential for embedding management in a Java-Spring environment with PostgreSQL:

  1. Add Operations: The add methods, crucial for inserting new embeddings, use MapSqlParameterSource for SQL parameter handling. This demonstrates how to integrate complex object mapping and database interactions in Spring.

  2. Find Relevant Embeddings: The findRelevant method showcases advanced querying techniques using PostgreSQL’s vector operations, crucial for fetching the most relevant embeddings based on a reference vector.

  3. Delete Operations: The deleteEmbeddingByFilename method highlights data management capabilities, allowing deletion of embeddings based on specific criteria.

The class encapsulates best practices in handling vector data within a Java-Spring context, providing a practical example of efficient database operations and vector data management.

For a complete understanding, here is the full code of the SpringJdbcPgVectorEmbeddingStore class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import org.postgresql.util.PGobject;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.pgvector.PGvector;

import dev.langchain4j.data.document.Metadata;
import dev.langchain4j.data.embedding.Embedding;
import dev.langchain4j.data.segment.TextSegment;
import dev.langchain4j.store.embedding.EmbeddingMatch;
import dev.langchain4j.store.embedding.EmbeddingStore;
import lombok.extern.slf4j.Slf4j;

/**
* A store for managing embeddings and text segments using PostgreSQL and pgvector.
* It provides functionality to add, retrieve, and delete embeddings and their associated text segments.
* Ref:
* https://github.com/langchain4j/langchain4j/blob/main/langchain4j-pgvector/src/main/java/dev/langchain4j/store/embedding/pgvector/PgVectorEmbeddingStore.java
* https://tembo.io/blog/vector-indexes-in-pgvector/
* https://supabase.com/blog/increase-performance-pgvector-hnsw
*/
@Slf4j
public class SpringJdbcPgVectorEmbeddingStore implements EmbeddingStore<TextSegment> {
private static final String SQL_INSERT_EMBEDDING = "INSERT INTO dataset_embedding " +
"(id, dataset_id, file_name, embedding_vector, document_text, metadata, version_count, created_by, creation_timestamp, modified_by, modification_timestamp) "
+ "VALUES (:id, :datasetId, :fileName, :embeddingVector, :documentText, :metadata::jsonb, :versionCount, :createdBy, :creationTimestamp, :modifiedBy, :modificationTimestamp)";

private static final String SQL_DELETE_EMBEDDING_BY_FILENAME = "DELETE FROM dataset_embedding WHERE file_name = :fileName AND dataset_id = :datasetId";

private static final String SQL_FIND_RELEVANT = "WITH temp AS (" +
"SELECT (2 - (embedding_vector <=> :embeddingVector)) / 2 AS score, id, embedding_vector, document_text, metadata "
+ "FROM dataset_embedding WHERE dataset_id IN (:datasetIds)) "
+ "SELECT * FROM temp WHERE score >= :minScore ORDER BY score DESC LIMIT :maxResults";

private ObjectMapper objectMapper;
private NamedParameterJdbcTemplate jdbcTemplate;
private List<String> datasetIds;

/**
* Constructs a new PgVectorEmbeddingStore with multiple dataset identifiers.
*
* @param datasetIds A list of dataset identifiers.
* @param jdbcTemplate The jdbcTemplate for database operations.
*/
public SpringJdbcPgVectorEmbeddingStore(List<String> datasetIds, NamedParameterJdbcTemplate jdbcTemplate) {
this.datasetIds = new ArrayList<>(datasetIds);
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = new ObjectMapper();
log.debug("PgVectorEmbeddingStore initialized with dataset identifiers: {}", datasetIds);
}

/**
* Constructs a new PgVectorEmbeddingStore with a single dataset identifier.
*
* @param datasetId A single dataset identifier.
* @param jdbcTemplate The jdbcTemplate for database operations.
*/
public SpringJdbcPgVectorEmbeddingStore(String datasetId, NamedParameterJdbcTemplate jdbcTemplate) {
this(Collections.singletonList(datasetId), jdbcTemplate);
log.debug("PgVectorEmbeddingStore initialized with single dataset identifier: {}", datasetId);
}

@Override
public String add(Embedding embedding) {
log.debug("Adding a single embedding to the database");
String id = UUID.randomUUID().toString();

try {
MapSqlParameterSource params = createSqlParameterSource(
id,
this.datasetIds.get(0),
embedding,
null);
jdbcTemplate.update(SQL_INSERT_EMBEDDING, params);
} catch (JsonProcessingException e) {
log.error("Error processing JSON", e);
return null;
}

log.info("Added embedding with ID: {}", id);
return id;
}

@Override
public void add(String id, Embedding embedding) {
log.debug("Adding a single embedding with provided ID: {}", id);

try {
MapSqlParameterSource params = createSqlParameterSource(
id,
this.datasetIds.get(0),
embedding,
null);
jdbcTemplate.update(SQL_INSERT_EMBEDDING, params);
} catch (JsonProcessingException e) {
log.error("Error processing JSON", e);
}

log.info("Added embedding with ID: {}", id);
}

@Override
public String add(Embedding embedding, TextSegment embedded) {
log.debug("Adding an embedding with associated text segment to the database");
String id = UUID.randomUUID().toString();

try {
MapSqlParameterSource params = createSqlParameterSource(
id,
this.datasetIds.get(0),
embedding,
embedded);
jdbcTemplate.update(SQL_INSERT_EMBEDDING, params);
} catch (JsonProcessingException e) {
log.error("Error processing JSON", e);
return null;
}

log.info("Added embedding with ID: {}", id);
return id;
}

@Override
public List<String> addAll(List<Embedding> embeddings) {
log.debug("Adding a list of embeddings to the database");
if (embeddings.isEmpty()) {
log.info("No embeddings to add.");
return Collections.emptyList();
}

if (!hasSingleDatasetIdentifier()) {
log.info("Operation requires a single dataset identifier.");
return Collections.emptyList();
}

List<TextSegment> textSegments = Collections.nCopies(embeddings.size(), null);
return addAll(embeddings, textSegments);
}

/**
* Adds a list of embeddings and their associated text segments to the database.
*
* @param embeddings The list of embeddings to add.
* @param textSegments The list of text segments associated with each embedding.
* @return A list of generated unique identifiers for the added embeddings.
*/
@Override
public List<String> addAll(List<Embedding> embeddings, List<TextSegment> textSegments) {
log.debug("Adding a list of embeddings and associated text segments to the database");
if (embeddings.isEmpty() || textSegments.isEmpty()) {
log.info("No embeddings or text segments to add.");
return Collections.emptyList();
}

if (!hasSingleDatasetIdentifier()) {
log.info("Operation requires a single dataset identifier.");
return Collections.emptyList();
}

List<MapSqlParameterSource> paramsForBatchInsert = prepareBatchInsertParams(embeddings, textSegments);

List<String> generatedIds = new ArrayList<>();

for (MapSqlParameterSource params : paramsForBatchInsert) {
generatedIds.add(params.getValue("id").toString());
}

jdbcTemplate.batchUpdate(SQL_INSERT_EMBEDDING, paramsForBatchInsert.toArray(new SqlParameterSource[0]));
log.info("Batch insert of embeddings and text segments completed");
return generatedIds;
}

/**
* Finds embeddings that are most relevant (closest in space) to a provided
* reference embedding.
*
* @param referenceEmbedding The embedding to use as a reference. The returned
* embeddings should be relevant (closest) to this.
* @param maxResults The maximum number of embeddings to return.
* @param minScore The minimum relevance score, ranging from 0 to 1
* (inclusive).
* Only embeddings with a score of this value or
* higher will be returned.
* @return A list of EmbeddingMatch objects. Each EmbeddingMatch includes a
* relevance score (derived from cosine distance), ranging from 0 (not
* relevant) to 1 (highly relevant).
*/
@Override
public List<EmbeddingMatch<TextSegment>> findRelevant(Embedding referenceEmbedding, int maxResults,
double minScore) {
log.debug("Finding embeddings relevant to a given reference embedding");

MapSqlParameterSource parameters = new MapSqlParameterSource()
.addValue("embeddingVector", new PGvector(referenceEmbedding.vector()))
.addValue("datasetIds", this.datasetIds)
.addValue("minScore", minScore)
.addValue("maxResults", maxResults);

return jdbcTemplate.query(SQL_FIND_RELEVANT, parameters, new RowMapper<EmbeddingMatch<TextSegment>>() {
@Override
public EmbeddingMatch<TextSegment> mapRow(ResultSet rs, int rowNum) throws SQLException {
double score = rs.getDouble("score");
String embeddingId = rs.getString("id");
PGobject embeddingVectorObj = (PGobject) rs.getObject("embedding_vector");
PGvector vector = new PGvector(embeddingVectorObj.getValue());
Embedding embedding = new Embedding(vector.toArray());
String text = rs.getString("document_text");
String metadataJson = Optional.ofNullable(rs.getString("metadata")).orElse("{}");
Map<String, String> metadataMap;
Metadata metadata;
try {
metadataMap = objectMapper.readValue(metadataJson, new TypeReference<Map<String, String>>() {
});
metadata = new Metadata(metadataMap);
} catch (JsonProcessingException e) {
log.error("Error processing JSON", e);
return null;
}
TextSegment textSegment = text == null || text.isEmpty() ? null : TextSegment.from(text, metadata);
return new EmbeddingMatch<>(score, embeddingId, embedding, textSegment);
}
});
}

/**
* Deletes embeddings from the database based on filename.
*
* @param filename The filename of the embeddings to be deleted.
*/
public void deleteEmbeddingByFilename(String filename) {
log.debug("Deleting embeddings from the database based on filename");
if (!hasSingleDatasetIdentifier()) {
log.info("Operation requires a single dataset identifier.");
return;
}
MapSqlParameterSource params = new MapSqlParameterSource()
.addValue("fileName", filename)
.addValue("datasetId", this.datasetIds.get(0));

jdbcTemplate.update(SQL_DELETE_EMBEDDING_BY_FILENAME, params);
log.info("Embeddings with filename {} have been deleted", filename);
}

/**
* Prepares the batch insert parameters for adding embeddings and text segments
* to the database.
*
* @param embeddings The list of embeddings to be added.
* @param textSegments The list of text segments associated with each embedding.
* @return A list of MapSqlParameterSource objects for batch insertion.
*/
private List<MapSqlParameterSource> prepareBatchInsertParams(List<Embedding> embeddings,
List<TextSegment> textSegments) {
log.debug("Preparing batch insert parameters for embeddings and text segments");
List<MapSqlParameterSource> paramsList = new ArrayList<>();
for (int i = 0; i < embeddings.size(); ++i) {
Embedding embedding = embeddings.get(i);
TextSegment textSegment = textSegments.get(i);
try {
MapSqlParameterSource params = createSqlParameterSource(
UUID.randomUUID().toString(),
this.datasetIds.get(0),
embedding,
textSegment);
paramsList.add(params);
} catch (JsonProcessingException e) {
log.error("Error processing JSON", e);
}
}
return paramsList;
}

/**
* Creates and initializes a MapSqlParameterSource object with parameters for
* SQL operations.
*
* @param id The unique identifier for the embedding.
* @param datasetId The identifier of the dataset.
* @param embedding The embedding object to be stored in the database.
* @param textSegment The text segment associated with the embedding.
* @return A MapSqlParameterSource object populated with the required
* parameters.
* @throws JsonProcessingException If an error occurs during JSON serialization.
*/
private MapSqlParameterSource createSqlParameterSource(String id, String datasetId, Embedding embedding,
TextSegment textSegment) throws JsonProcessingException {
log.debug("Creating SQL parameter source for embedding ID: {}", id);

// Convert metadata to JSON string
Map<String, String> metadataMap = new HashMap<>(textSegment.metadata().asMap());
String metadataJson = this.objectMapper.writeValueAsString(metadataMap);

// Create and populate parameters for SQL operation
MapSqlParameterSource params = new MapSqlParameterSource()
.addValue("id", id)
.addValue("datasetId", datasetId)
.addValue("fileName", metadataMap.get("file_name"))
.addValue("embeddingVector", new PGvector(embedding.vector()))
.addValue("documentText", textSegment.text())
.addValue("metadata", metadataJson)
.addValue("versionCount", 1)
.addValue("createdBy", "createdBy") // Consider using a dynamic value
.addValue("creationTimestamp", LocalDateTime.now())
.addValue("modifiedBy", "modifiedBy") // Consider using a dynamic value
.addValue("modificationTimestamp", LocalDateTime.now());

log.debug("SQL parameter source created for embedding ID: {}", id);
return params;
}

private boolean hasSingleDatasetIdentifier() {
return this.datasetIds.size() == 1;
}
}

This code serves as a detailed guide for developers looking to implement similar functionalities in their Java applications.

Leveraging Liquibase for Database Version Control

Liquibase plays a crucial role in managing database schema changes and versioning in a Spring Boot application. It ensures a systematic approach to database migrations, making it easier to track and apply schema changes across different environments and versions.

Through the provided YAML configurations, Liquibase enables specific schema operations like enabling vector extensions (001-enable-extension.yaml) and initializing database schema (002-schema-init.yaml). These changes are managed and tracked, ensuring consistency and ease of maintenance.

In conclusion, Liquibase’s integration into a Spring Boot project, illustrated by these practical examples, demonstrates its efficiency in managing database schemas, making it an invaluable tool for developers in maintaining and evolving their database structures.

The integration of Langchain4j, PostgreSQL, Liquibase, and Spring Boot demonstrates a powerful combination for handling vector data in Java applications. This approach offers robust data management, efficient schema version control, and streamlined embedding operations. By leveraging these technologies, developers can manage complex data types more effectively, ensure database consistency across versions, and enhance overall application performance and scalability. This integration is particularly beneficial for data-intensive applications requiring sophisticated handling of vector data and database management.