Updated cache in sockect request
This commit is contained in:
parent
50c9e10b74
commit
dcb7c54800
2 changed files with 120 additions and 7 deletions
|
@ -13,11 +13,13 @@ import db from '../db';
|
||||||
import { chats, messages } from '../db/schema';
|
import { chats, messages } from '../db/schema';
|
||||||
import { eq } from 'drizzle-orm';
|
import { eq } from 'drizzle-orm';
|
||||||
import crypto from 'crypto';
|
import crypto from 'crypto';
|
||||||
|
import redisClient from '../utils/redisClient';
|
||||||
|
|
||||||
type Message = {
|
type Message = {
|
||||||
messageId: string;
|
messageId: string;
|
||||||
chatId: string;
|
chatId: string;
|
||||||
content: string;
|
content: string;
|
||||||
|
cache: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
type WSMessage = {
|
type WSMessage = {
|
||||||
|
@ -42,6 +44,8 @@ const handleEmitterEvents = (
|
||||||
ws: WebSocket,
|
ws: WebSocket,
|
||||||
messageId: string,
|
messageId: string,
|
||||||
chatId: string,
|
chatId: string,
|
||||||
|
cacheKey: string,
|
||||||
|
shouldCache: boolean,
|
||||||
) => {
|
) => {
|
||||||
let recievedMessage = '';
|
let recievedMessage = '';
|
||||||
let sources = [];
|
let sources = [];
|
||||||
|
@ -68,10 +72,13 @@ const handleEmitterEvents = (
|
||||||
sources = parsedData.data;
|
sources = parsedData.data;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
emitter.on('end', () => {
|
|
||||||
|
emitter.on('end', async () => {
|
||||||
ws.send(JSON.stringify({ type: 'messageEnd', messageId: messageId }));
|
ws.send(JSON.stringify({ type: 'messageEnd', messageId: messageId }));
|
||||||
|
|
||||||
db.insert(messages)
|
// Guardar el mensaje recibido en la base de datos
|
||||||
|
await db
|
||||||
|
.insert(messages)
|
||||||
.values({
|
.values({
|
||||||
content: recievedMessage,
|
content: recievedMessage,
|
||||||
chatId: chatId,
|
chatId: chatId,
|
||||||
|
@ -83,7 +90,23 @@ const handleEmitterEvents = (
|
||||||
}),
|
}),
|
||||||
})
|
})
|
||||||
.execute();
|
.execute();
|
||||||
|
|
||||||
|
// Almacenar la respuesta en caché si shouldCache es true
|
||||||
|
if (shouldCache) {
|
||||||
|
const responseWithSources = {
|
||||||
|
content: recievedMessage,
|
||||||
|
chatId: chatId,
|
||||||
|
messageId: messageId,
|
||||||
|
role: 'assistant',
|
||||||
|
metadata: JSON.stringify(sources),
|
||||||
|
};
|
||||||
|
await redisClient
|
||||||
|
.setEx(cacheKey, 86400, JSON.stringify(responseWithSources))
|
||||||
|
.then(() => logger.info(`Cache set for ${cacheKey}`))
|
||||||
|
.catch((err) => logger.error(`Redis setEx error: ${err}`));
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
emitter.on('error', (data) => {
|
emitter.on('error', (data) => {
|
||||||
const parsedData = JSON.parse(data);
|
const parsedData = JSON.parse(data);
|
||||||
ws.send(
|
ws.send(
|
||||||
|
@ -105,7 +128,6 @@ export const handleMessage = async (
|
||||||
try {
|
try {
|
||||||
const parsedWSMessage = JSON.parse(message) as WSMessage;
|
const parsedWSMessage = JSON.parse(message) as WSMessage;
|
||||||
const parsedMessage = parsedWSMessage.message;
|
const parsedMessage = parsedWSMessage.message;
|
||||||
|
|
||||||
const id = crypto.randomBytes(7).toString('hex');
|
const id = crypto.randomBytes(7).toString('hex');
|
||||||
|
|
||||||
if (!parsedMessage.content)
|
if (!parsedMessage.content)
|
||||||
|
@ -117,6 +139,69 @@ export const handleMessage = async (
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const cacheKey = parsedMessage.content;
|
||||||
|
const shouldCache = parsedMessage.cache === '1';
|
||||||
|
|
||||||
|
if (shouldCache) {
|
||||||
|
const cachedResponse = await redisClient.get(cacheKey);
|
||||||
|
|
||||||
|
if (cachedResponse) {
|
||||||
|
const jsonDatabase = JSON.parse(cachedResponse);
|
||||||
|
|
||||||
|
ws.send(
|
||||||
|
JSON.stringify({
|
||||||
|
type: 'message',
|
||||||
|
data: jsonDatabase.content,
|
||||||
|
messageId: jsonDatabase.messageId,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
const sources = JSON.parse(jsonDatabase.metadata);
|
||||||
|
ws.send(
|
||||||
|
JSON.stringify({
|
||||||
|
type: 'sources',
|
||||||
|
data: sources,
|
||||||
|
messageId: jsonDatabase.messageId,
|
||||||
|
cache: true,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
await db
|
||||||
|
.insert(chats)
|
||||||
|
.values({
|
||||||
|
id: parsedMessage.chatId,
|
||||||
|
title: parsedMessage.content,
|
||||||
|
createdAt: new Date().toString(),
|
||||||
|
focusMode: parsedWSMessage.focusMode,
|
||||||
|
})
|
||||||
|
.execute();
|
||||||
|
await db
|
||||||
|
.insert(messages)
|
||||||
|
.values({
|
||||||
|
content: parsedMessage.content,
|
||||||
|
chatId: parsedMessage.chatId,
|
||||||
|
messageId: id,
|
||||||
|
role: 'user',
|
||||||
|
metadata: JSON.stringify({
|
||||||
|
createdAt: new Date(),
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
.execute();
|
||||||
|
await db
|
||||||
|
.insert(messages)
|
||||||
|
.values({
|
||||||
|
content: jsonDatabase.content,
|
||||||
|
chatId: parsedMessage.chatId,
|
||||||
|
messageId: id,
|
||||||
|
role: jsonDatabase.role,
|
||||||
|
metadata: JSON.stringify({
|
||||||
|
createdAt: new Date(),
|
||||||
|
...(sources && sources.length > 0 && { sources }),
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
.execute();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const history: BaseMessage[] = parsedWSMessage.history.map((msg) => {
|
const history: BaseMessage[] = parsedWSMessage.history.map((msg) => {
|
||||||
if (msg[0] === 'human') {
|
if (msg[0] === 'human') {
|
||||||
return new HumanMessage({
|
return new HumanMessage({
|
||||||
|
@ -140,7 +225,14 @@ export const handleMessage = async (
|
||||||
embeddings,
|
embeddings,
|
||||||
);
|
);
|
||||||
|
|
||||||
handleEmitterEvents(emitter, ws, id, parsedMessage.chatId);
|
handleEmitterEvents(
|
||||||
|
emitter,
|
||||||
|
ws,
|
||||||
|
id,
|
||||||
|
parsedMessage.chatId,
|
||||||
|
cacheKey,
|
||||||
|
shouldCache,
|
||||||
|
);
|
||||||
|
|
||||||
const chat = await db.query.chats.findFirst({
|
const chat = await db.query.chats.findFirst({
|
||||||
where: eq(chats.id, parsedMessage.chatId),
|
where: eq(chats.id, parsedMessage.chatId),
|
||||||
|
|
|
@ -215,7 +215,7 @@ const loadMessages = async (
|
||||||
const chatsIdUrl = new URL(
|
const chatsIdUrl = new URL(
|
||||||
`${process.env.NEXT_PUBLIC_API_URL}/chats/${chatId}`,
|
`${process.env.NEXT_PUBLIC_API_URL}/chats/${chatId}`,
|
||||||
);
|
);
|
||||||
chatsIdUrl.searchParams.append('cache', '1');
|
//chatsIdUrl.searchParams.append('cache', '1');
|
||||||
const res = await fetch(chatsIdUrl, {
|
const res = await fetch(chatsIdUrl, {
|
||||||
method: 'GET',
|
method: 'GET',
|
||||||
headers: {
|
headers: {
|
||||||
|
@ -275,7 +275,6 @@ const ChatWindow = ({ id }: { id?: string }) => {
|
||||||
|
|
||||||
const [chatHistory, setChatHistory] = useState<[string, string][]>([]);
|
const [chatHistory, setChatHistory] = useState<[string, string][]>([]);
|
||||||
const [messages, setMessages] = useState<Message[]>([]);
|
const [messages, setMessages] = useState<Message[]>([]);
|
||||||
|
|
||||||
const [focusMode, setFocusMode] = useState('webSearch');
|
const [focusMode, setFocusMode] = useState('webSearch');
|
||||||
|
|
||||||
const [isMessagesLoaded, setIsMessagesLoaded] = useState(false);
|
const [isMessagesLoaded, setIsMessagesLoaded] = useState(false);
|
||||||
|
@ -343,6 +342,7 @@ const ChatWindow = ({ id }: { id?: string }) => {
|
||||||
message: {
|
message: {
|
||||||
chatId: chatId!,
|
chatId: chatId!,
|
||||||
content: message,
|
content: message,
|
||||||
|
cache: '1',
|
||||||
},
|
},
|
||||||
focusMode: focusMode,
|
focusMode: focusMode,
|
||||||
history: [...chatHistory, ['human', message]],
|
history: [...chatHistory, ['human', message]],
|
||||||
|
@ -371,6 +371,26 @@ const ChatWindow = ({ id }: { id?: string }) => {
|
||||||
|
|
||||||
if (data.type === 'sources') {
|
if (data.type === 'sources') {
|
||||||
sources = data.data;
|
sources = data.data;
|
||||||
|
if (typeof sources === 'string') {
|
||||||
|
sources = JSON.parse(data.data);
|
||||||
|
added = false;
|
||||||
|
setLoading(false);
|
||||||
|
}
|
||||||
|
if (data.cache) {
|
||||||
|
{
|
||||||
|
setMessages((prevMessages) => [
|
||||||
|
prevMessages[0],
|
||||||
|
{
|
||||||
|
...prevMessages[1],
|
||||||
|
sources:
|
||||||
|
typeof sources === 'string' ? JSON.parse(sources) : sources,
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
added = true;
|
||||||
|
}
|
||||||
|
setLoading(false);
|
||||||
|
}
|
||||||
|
|
||||||
if (!added) {
|
if (!added) {
|
||||||
setMessages((prevMessages) => [
|
setMessages((prevMessages) => [
|
||||||
...prevMessages,
|
...prevMessages,
|
||||||
|
@ -379,7 +399,8 @@ const ChatWindow = ({ id }: { id?: string }) => {
|
||||||
messageId: data.messageId,
|
messageId: data.messageId,
|
||||||
chatId: chatId!,
|
chatId: chatId!,
|
||||||
role: 'assistant',
|
role: 'assistant',
|
||||||
sources: sources,
|
sources:
|
||||||
|
typeof sources === 'string' ? JSON.parse(sources) : sources,
|
||||||
createdAt: new Date(),
|
createdAt: new Date(),
|
||||||
},
|
},
|
||||||
]);
|
]);
|
||||||
|
|
Loading…
Add table
Reference in a new issue