From e9c1d1815fb0b679781f169923832c5a3301e7d6 Mon Sep 17 00:00:00 2001 From: Tobias Weise Date: Mon, 9 Sep 2024 16:18:32 +0200 Subject: [PATCH] app factory and more --- .gitignore | 6 +- backend/app.py | 952 ++++++++++++++++++---------------- backend/lib/chatbot.py | 20 +- backend/lib/knowledge.py | 38 ++ backend/lib/logging.py | 68 +++ backend/lib/models.py | 140 +++-- deployment/docker-compose.yml | 35 ++ deployment/sample.env | 5 +- 8 files changed, 747 insertions(+), 517 deletions(-) create mode 100644 backend/lib/knowledge.py create mode 100644 backend/lib/logging.py diff --git a/.gitignore b/.gitignore index 577ec22..b160d8b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,10 @@ backend/__pycache__/ ollama -deployment/ollama *.env deployment/*.env +deployment/ollama +# neo4j +deployment/data +deployment/logs + diff --git a/backend/app.py b/backend/app.py index 0fed00d..655d8eb 100644 --- a/backend/app.py +++ b/backend/app.py @@ -1,20 +1,14 @@ -""" -OpenAPI access via http://localhost:5000/openapi/ on local docker-compose deployment -""" +__version__ = "0.4.0" #------std lib modules:------- import os, sys, json, time import os.path from typing import Any, Tuple, List, Dict, Any, Callable, Optional, Union from datetime import datetime, date -import logging from functools import wraps #-------ext libs-------------- - -#from elasticsearch import NotFoundError, Elasticsearch # for normal read/write without vectors -#from elasticsearch_dsl import A, Document, Date, Integer, Keyword, Float, Long, Text, connections from elasticsearch_dsl import connections from pydantic import BaseModel, Field @@ -24,18 +18,20 @@ import jwt as pyjwt from flask import Flask, send_from_directory, send_file, Response, request, jsonify from flask_openapi3 import Info, Tag, OpenAPI, Server #FileStorage from flask_socketio import SocketIO, join_room, leave_room, rooms, send -#from werkzeug.utils import secure_filename +from werkzeug.utils import secure_filename import asyncio +import logging #----------home grown-------------- from lib.funcs import group_by from lib.elastictools import get_by_id, wait_for_elasticsearch -from lib.models import init_indicies, Chatbot, User, Text -from lib.chatbot import ask_bot, ask_bot2, train_text, download_llm +from lib.models import init_indicies, Chatbot, User, Text, Question, Answer +from lib.chatbot import ask_bot, train_text, download_llm from lib.speech import text_to_speech from lib.mail import send_mail from lib.user import hash_password, create_user, create_default_users +from lib.logging import ElasticsearchLogHandler, get_log_level BOT_ROOT_PATH = os.getenv("BOT_ROOT_PATH") @@ -51,53 +47,26 @@ jwt_secret = os.getenv("SECRET") assert jwt_secret -# JWT Bearer Sample -jwt = { - "type": "http", - "scheme": "bearer", - "bearerFormat": "JWT" -} -security_schemes = {"jwt": jwt} -security = [{"jwt": []}] +class MeasureTime: + + def __init__(self, name, logger=logging.getLogger()): + self.__start = datetime.now().timestamp() + self.__logger = logger + self.__name = name -def get_module_versions(): - with open("requirements.txt", "r") as f: - modules = {line.split("#")[0] for line in f.read().split("\n") if line.split("#")[0] != ""} - with open("current_requirements.txt", "r") as f: - d = {k: v for (k, v) in [line.split("#")[0].split("==") for line in f.read().split("\n") if line.split("#")[0] != ""]} - return {k: v for k, v in d.items() if k in modules} + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + duration = round(datetime.now().timestamp() - self.__start, 2) + self.__logger.info(f"{self.__name} duration: {duration} seconds") + print(f"{self.__name} duration: {duration} seconds", flush=True) + return False -import multiprocessing -cpus = multiprocessing.cpu_count() - - -info = Info( - title="CreativeBots-API", - version="1.0.0", - summary="The REST-API to manage bots, users and more!", - description="CPUs: " + str(cpus) + "
" + json.dumps(get_module_versions(), indent=4).replace("\n", "
") -) -servers = [ - Server(url=BOT_ROOT_PATH ) -] - -class NotFoundResponse(BaseModel): - code: int = Field(-1, description="Status Code") - message: str = Field("Resource not found!", description="Exception Information") - -app = OpenAPI( - __name__, - info=info, - servers=servers, - responses={404: NotFoundResponse}, - security_schemes=security_schemes - ) - - -def uses_jwt(required=True): +def uses_jwt(logger=None, required=True): """ Wraps routes in a jwt-required logic and passes decoded jwt and user from elasticsearch to the route as keyword """ @@ -108,6 +77,11 @@ def uses_jwt(required=True): def non_param_deco(f): @wraps(f) def decorated_route(*args, **kwargs): + + #app.logger.info(f"Request from IP: '{request.remote_addr}'") + + #request.environ.get('HTTP_X_REAL_IP', request.remote_addr) + token = None if "Authorization" in request.headers: token = request.headers["Authorization"].split(" ")[1] @@ -156,476 +130,570 @@ def uses_jwt(required=True): -socket = SocketIO(app, cors_allowed_origins="*") -@socket.on('connect') -def sockcon(data): - """ - put every connection into it's own room - to avoid broadcasting messages - answer in callback only to room with sid - """ - room = request.sid + request.remote_addr - join_room(room) - socket.emit('backend response', {'msg': f'Connected to room {room} !', "room": room}) # looks like iOS needs an answer +def create_app(): + + # JWT Bearer Sample + jwt = { + "type": "http", + "scheme": "bearer", + "bearerFormat": "JWT" + } + security_schemes = {"jwt": jwt} + security = [{"jwt": []}] -class SocketMessage(BaseModel): - question: str = Field(None, description="Status Code") - system_prompt: str = Field(None, description="Status Code") - bot_id: str = Field(None, description="Status Code") - room: Union[str, None] # = Field(None, description="Status Code") + info = Info( + title="CreativeBots-API", + version=__version__, + summary="The REST-API to manage bots, users and more!", + description="" + ) + servers = [ + Server(url=BOT_ROOT_PATH ) + ] + class NotFoundResponse(BaseModel): + code: int = Field(-1, description="Status Code") + message: str = Field("Resource not found!", description="Exception Information") -#TODO: pydantic message type validation - -@socket.on('client message') -def handle_message(message): - - SocketMessage.model_validate(message) - - - #try: - room = message["room"] - question = message["question"] - system_prompt = message["system_prompt"] - bot_id = message["bot_id"] - - start = datetime.now().timestamp() - d = ask_bot2(system_prompt + " " + question, bot_id) - - def get_scores(*args): - score_docs = d["get_score_docs"]() - return score_docs - - - def do_streaming(*args): - start_stream = datetime.now().timestamp() - for chunk in d["answer_generator"](): - socket.emit('backend token', {'data': chunk, "done": False}, to=room) - stream_duration = round(datetime.now().timestamp() - start_stream, 2) - print("Stream duration: ", stream_duration, flush=True) - - - async def f(): - ls = await asyncio.gather( - asyncio.to_thread(get_scores, 1,2,3), - asyncio.to_thread(do_streaming, 1,2,3) + app = OpenAPI( + __name__, + info=info, + servers=servers, + responses={404: NotFoundResponse}, + security_schemes=security_schemes ) - return ls - [score_docs, _] = asyncio.run(f()) + logger = logging.getLogger() - socket.emit('backend token', { - 'done': True, - "score_docs": score_docs - }, to=room) + socket = SocketIO(app, cors_allowed_origins="*") - duration = round(datetime.now().timestamp() - start, 2) - print("Total duration: ", duration, flush=True) + @socket.on('connect') + def sockcon(data): + """ + put every connection into it's own room + to avoid broadcasting messages + answer in callback only to room with sid + """ + room = request.sid + request.remote_addr + join_room(room) + socket.emit('backend response', {'msg': f'Connected to room {room} !', "room": room}) # looks like iOS needs an answer + + + class SocketMessage(BaseModel): + question: str = Field(None, description="Status Code") + system_prompt: str = Field(None, description="Status Code") + bot_id: str = Field(None, description="Status Code") + room: Union[str, None] # = Field(None, description="Status Code") + + + #TODO: pydantic message type validation + + @socket.on('client message') + def handle_message(message): + + SocketMessage.model_validate(message) + + logger.info("Starting stream") + + + #try: + room = message["room"] + question = message["question"] + system_prompt = message["system_prompt"] + bot_id = message["bot_id"] + + + q = Question() + q.text = question + q.md5 = hash_password(question) + q.save() + + + #start = datetime.now().timestamp() + + with MeasureTime("Streaming + Scoring docs -total", logger) as timer: + + d = ask_bot(system_prompt + " " + question, bot_id) + + def get_scores(*args): + score_docs = d["get_score_docs"]() + return score_docs + + + def do_streaming(*args): + start_stream = datetime.now().timestamp() + answer = "" + for chunk in d["answer_generator"](): + socket.emit('backend token', {'data': chunk, "done": False}, to=room) + answer += chunk + + stream_duration = round(datetime.now().timestamp() - start_stream, 2) + + logger.info("Stream duration: " + str(stream_duration)) + + a = Answer( + question_id=q.meta.id, + answer=answer, + md5=hash_password(answer) + ) + a.save() -#======================= TAGS ============================= + async def f(): + ls = await asyncio.gather( + asyncio.to_thread(get_scores, 1,2,3), + asyncio.to_thread(do_streaming, 1,2,3) + ) + return ls -not_implemented_tag = Tag(name='Not implemented', description='Functionality not yet implemented beyond an empty response') -debug_tag = Tag(name='Debug', description='Debug') -bot_tag = Tag(name='Bot', description='Bot') -user_tag = Tag(name='User', description='User') + [score_docs, _] = asyncio.run(f()) -#==============Routes=============== - -class LoginRequest(BaseModel): - email: str = Field(None, description='The users E-Mail that serves as nick too.') - password: str = Field(None, description='A short text by the user explaining the rating.') + socket.emit('backend token', { + 'done': True, + "score_docs": score_docs + }, to=room) -@app.post('/user/login', summary="", tags=[user_tag]) -def login(form: LoginRequest): - """ - Get your JWT to verify access rights - """ - if form.email is None or form.password is None: - msg = "Invalid password!" - app.logger.error(msg) - return jsonify({ - 'status': 'error', - 'message': msg - }), 400 + #duration = round(datetime.now().timestamp() - start, 2) + #logger.info("Total duration: " + str(duration)) - match get_by_id(index="user", id_field_name="email", id_value=form.email): - case []: - msg = "User with email '%s' doesn't exist!" % form.email - app.logger.error(msg) + + + + #======================= TAGS ============================= + + not_implemented_tag = Tag(name='Not implemented', description='Functionality not yet implemented beyond an empty response') + debug_tag = Tag(name='Debug', description='Debug') + bot_tag = Tag(name='Bot', description='Bot') + user_tag = Tag(name='User', description='User') + + #==============Routes=============== + + class LoginRequest(BaseModel): + email: str = Field(None, description='The users E-Mail that serves as nick too.') + password: str = Field(None, description='A short text by the user explaining the rating.') + + + @app.post('/user/login', summary="", tags=[user_tag]) + def login(form: LoginRequest): + """ + Get your JWT to verify access rights + """ + LoginRequest.model_validate(form) + + + if form.email is None or form.password is None: + msg = "Invalid password!" + logger.error(msg) return jsonify({ 'status': 'error', 'message': msg }), 400 - case [user]: - if user["password_hash"] == hash_password(form.password + form.email): - token = pyjwt.encode({"email": form.email}, jwt_secret, algorithm="HS256") - #app.logger.info(token) - return jsonify({ - 'status': 'success', - 'jwt': token - }) - else: - msg = "Invalid password!" - app.logger.error(msg) + match get_by_id(index="user", id_field_name="email", id_value=form.email): + case []: + msg = "User with email '%s' doesn't exist!" % form.email + logger.error(msg) return jsonify({ 'status': 'error', 'message': msg }), 400 - -class RegisterRequest(BaseModel): - email: str = Field(None, description='The users E-Mail that serves as nick too.') - password: str = Field(None, description='A short text by the user explaining the rating.') + case [user]: + if user["password_hash"] == hash_password(form.password + form.email): + token = pyjwt.encode({"email": form.email}, jwt_secret, algorithm="HS256") + #logger.info(token) + return jsonify({ + 'status': 'success', + 'jwt': token + }) + else: + msg = "Invalid password!" + logger.error(msg) + return jsonify({ + 'status': 'error', + 'message': msg + }), 400 -@app.post('/user/register', summary="", tags=[user_tag]) -def register(form: RegisterRequest): - """ - Register an account - """ - - if form.email is None or form.password is None: - msg = "Parameters missing!" - app.logger.error(msg) - return jsonify({ - 'status': 'error', - 'message': msg - }), 400 + class RegisterRequest(BaseModel): + email: str = Field(None, description='The users E-Mail that serves as nick too.') + password: str = Field(None, description='A short text by the user explaining the rating.') - if User.get(id=form.email, ignore=404) is not None: - return jsonify({ - 'status': 'error', - "message": "User with that e-mail address already exists!" - }) - - else: - user = User(meta={'id': form.email}) - user.creation_date = datetime.now() - user.email = form.email - user.password_hash = hash_password(form.password + form.email) - user.role = "User" - user.isEmailVerified = False - user.save() - - msg = """ -

Verify E-Mail

- - Hi! - - Please click on the following link to verify your e-mail: - - - Click here! - + @app.post('/user/register', summary="", tags=[user_tag]) + def register(form: RegisterRequest): + """ + Register an account """ - send_mail(user.email, "User registration @ Creative Bots", "Creative Bots", msg) + if form.email is None or form.password is None: + msg = "Parameters missing!" + logger.error(msg) + return jsonify({ + 'status': 'error', + 'message': msg + }), 400 + + + if User.get(id=form.email, ignore=404) is not None: + return jsonify({ + 'status': 'error', + "message": "User with that e-mail address already exists!" + }) + + else: + user = User(meta={'id': form.email}) + user.creation_date = datetime.now() + user.email = form.email + user.password_hash = hash_password(form.password + form.email) + user.role = "User" + user.isEmailVerified = False + user.save() + + msg = """ +

Verify E-Mail

+ + Hi! + + Please click on the following link to verify your e-mail: + + + Click here! + + """ + + send_mail(user.email, "User registration @ Creative Bots", "Creative Bots", msg) + + return jsonify({ + 'status': 'success' + }) + + + + #-----bot routes------ + + + class GetSpeechRequest(BaseModel): + text: str = Field(None, description="Some text to convert to mp3") + + @app.post('/text2speech', summary="", tags=[], security=security) + def text2speech(form: GetSpeechRequest): + + with MeasureTime("text2speech file", logger) as timer: + file_name = text_to_speech(form.text) + + logger.info("Created '%s' !" % file_name) return jsonify({ - 'status': 'success' + "status": "success", + "file": "/" + file_name }) -#-----bot routes------ + #============ Bot CRUD =============== + + class CreateBotRequest(BaseModel): + name: str = Field(None, description="The bot's name") + visibility: str = Field('private', description="The bot's visibility to other users ('private', 'public')") + description: str = Field('', description="The bot's description of purpose and being") + system_prompt: str = Field('', description="The bot's defining system prompt") + llm_model: str = Field("llama3", description="The bot's used LLM") + + #status = Keyword() + #temperature = Float() -class GetSpeechRequest(BaseModel): - text: str = Field(None, description="Some text to convert to mp3") + @app.post('/bot', summary="", tags=[bot_tag], security=security) + @uses_jwt() + def create_bot(form: CreateBotRequest, decoded_jwt, user): + """ + Creates a chatbot for the JWT associated user. + """ + CreateBotRequest.model_validate(form) -@app.post('/text2speech', summary="", tags=[], security=security) -def text2speech(form: GetSpeechRequest): - file_name = text_to_speech(form.text) - return jsonify({ - "status": "success", - "file": "/" + file_name - }) + bot = Chatbot( + name=form.name, + visibility=form.visibility, + description=form.description, + system_prompt=form.system_prompt, + llm_model=form.llm_model + ) + #add meta data + bot.creation_date = datetime.now() + bot.creator_id = user.meta.id + bot.save() -#============ Bot CRUD =============== - -class CreateBotRequest(BaseModel): - name: str = Field(None, description="The bot's name") - visibility: str = Field('private', description="The bot's visibility to other users ('private', 'public')") - description: str = Field('', description="The bot's description of purpose and being") - system_prompt: str = Field('', description="The bot's defining system prompt") - llm_model: str = Field("llama3", description="The bot's used LLM") - - #status = Keyword() - #temperature = Float() - - -@app.post('/bot', summary="", tags=[bot_tag], security=security) -@uses_jwt() -def create_bot(form: CreateBotRequest, decoded_jwt, user): - """ - Creates a chatbot for the JWT associated user. - """ - CreateBotRequest.model_validate(form) - - bot = Chatbot() - bot.name = form.name - bot.visibility = form.visibility - bot.description = form.description - bot.system_prompt = form.system_prompt - bot.llm_model = form.llm_model - - #add meta data - bot.creation_date = datetime.now() - bot.creator_id = user.meta.id - bot.save() - - return jsonify({ - "bot_id": bot.meta.id - }) + return jsonify({ + "bot_id": bot.meta.id + }) -class GetBotRequest(BaseModel): - id: str = Field(None, description="The bot's id") + class GetBotRequest(BaseModel): + id: str = Field(None, description="The bot's id") -@app.get('/bot', summary="", tags=[bot_tag], security=security) -@uses_jwt(required=False) -def get_bots(query: GetBotRequest, decoded_jwt, user): - """ - List all bots or one by id - """ - match query.id: - case None: - match user: - case None: - #get all public bots - ls = [] - for hit in Chatbot.search()[0:10000].execute(): - d = hit.to_dict() - if d["visibility"] == "public": - d["id"] = hit.meta.id - ls.append(d) - - return jsonify(ls) - - case _: - #get all user bots - ls = [] - for hit in Chatbot.search()[0:10000].execute(): - d = hit.to_dict() - if "creator_id" in d: - if user.meta.id == d["creator_id"]: + @app.get('/bot', summary="", tags=[bot_tag], security=security) + @uses_jwt(required=False) + def get_bots(query: GetBotRequest, decoded_jwt, user): + """ + List all bots or one by id + """ + match query.id: + case None: + match user: + case None: + #get all public bots + ls = [] + for hit in Chatbot.search()[0:10000].execute(): + d = hit.to_dict() + if d["visibility"] == "public": d["id"] = hit.meta.id ls.append(d) - return jsonify(ls) + return jsonify(ls) - case some_id: - match user: - case None: - bot = Chatbot.get(id=query.id) - if bot.visibility == "public": + case _: + #get all user bots + ls = [] + for hit in Chatbot.search()[0:10000].execute(): + d = hit.to_dict() + if "creator_id" in d: + if user.meta.id == d["creator_id"]: + d["id"] = hit.meta.id + ls.append(d) + + return jsonify(ls) + + case some_id: + match user: + case None: + bot = Chatbot.get(id=query.id) + if bot.visibility == "public": + d = bot.to_dict() + d["id"] = bot.meta.id + return jsonify(d) + else: + return jsonify(None) + case _: + bot = Chatbot.get(id=query.id) d = bot.to_dict() d["id"] = bot.meta.id return jsonify(d) - else: - return jsonify(None) - case _: - bot = Chatbot.get(id=query.id) - d = bot.to_dict() - d["id"] = bot.meta.id - return jsonify(d) - -class UpdateBotRequest(BaseModel): - id: str = Field(None, description="The bot's id") - name: str = Field(None, description="The bot's name") - visibility: str = Field(None, description="The bot's visibility to other users ('private', 'public')") - description: str = Field(None, description="The bot's description of purpose and being") - system_prompt: str = Field(None, description="The bot's defining system prompt") - llm_model: str = Field(None, description="The bot's used LLM") + class UpdateBotRequest(BaseModel): + id: str = Field(None, description="The bot's id") + name: str = Field(None, description="The bot's name") + visibility: str = Field(None, description="The bot's visibility to other users ('private', 'public')") + description: str = Field(None, description="The bot's description of purpose and being") + system_prompt: str = Field(None, description="The bot's defining system prompt") + llm_model: str = Field(None, description="The bot's used LLM") -@app.put('/bot', summary="", tags=[bot_tag], security=security) -@uses_jwt() -def update_bot(form: UpdateBotRequest, decoded_jwt, user): - """ - Change a chatbot via it's id - """ - bot = Chatbot.get(id=form.id) + @app.put('/bot', summary="", tags=[bot_tag], security=security) + @uses_jwt() + def update_bot(form: UpdateBotRequest, decoded_jwt, user): + """ + Change a chatbot via it's id + """ + bot = Chatbot.get(id=form.id) - bot.name = form.name - bot.visibility = form.visibility - bot.description = form.description - bot.system_prompt = form.system_prompt - bot.llm_model = form.llm_model + bot.name = form.name + bot.visibility = form.visibility + bot.description = form.description + bot.system_prompt = form.system_prompt + bot.llm_model = form.llm_model - #add meta data - bot.changed_date = datetime.now() - bot.save() + #add meta data + bot.changed_date = datetime.now() + bot.save() - return jsonify({ - "status": "success" - }) - - - - -class DeleteBotRequest(BaseModel): - id: str = Field(None, description="The bot's id") - -@app.delete('/bot', summary="", tags=[bot_tag], security=security) -@uses_jwt() -def delete_bot(form: DeleteBotRequest, decoded_jwt, user): - """ - Deletes a chatbot via it's id - """ - bot = Chatbot.get(id=form.id) - bot.delete() - return jsonify({ - "status": "success" - }) - - - -#============================================================================ - - -class AskBotRequest(BaseModel): - bot_id: str = Field(None, description="The bot's id") - question: str = Field(None, description="The question the bot should answer") - - -@app.get('/bot/ask', summary="", tags=[bot_tag], security=security) -@uses_jwt() -def query_bot(query: AskBotRequest, decoded_jwt, user): - """ - Asks a chatbot - """ - - bot_id = query.bot_id - question = query.question - - - start = datetime.now().timestamp() - - d = ask_bot2(system_prompt + " " + question, bot_id) - - def get_scores(*args): - score_docs = d["get_score_docs"]() - return score_docs - - - def do_streaming(*args): - start_stream = datetime.now().timestamp() - answer = "" - for chunk in d["answer_generator"](): - answer += chunk - - stream_duration = round(datetime.now().timestamp() - start_stream, 2) - print("Stream duration: ", stream_duration, flush=True) - return answer - - - async def f(): - ls = await asyncio.gather( - asyncio.to_thread(get_scores, 1,2,3), - asyncio.to_thread(do_streaming, 1,2,3) - ) - return ls - - [score_docs, answer] = asyncio.run(f()) - - duration = round(datetime.now().timestamp() - start, 2) - print("Total duration: ", duration, flush=True) - - app.logger.info(duration) - - return jsonify({ - "answer": answer, - "duration": str(duration), - #"docs": ls#, - "score_docs": xs - }) - - - - -#-----------------Embedding---------------------- - -class TrainTextRequest(BaseModel): - bot_id: str = Field(None, description="The bot's id") - text: str = Field(None, description="Some text") - - -@app.post('/bot/train/text', summary="", tags=[bot_tag], security=security) -@uses_jwt() -def upload(form: TrainTextRequest, decoded_jwt, user): - """ - Caution: Long running request! - """ - bot_id = form.bot_id - text = form.text - - # validate body - if not bot_id: return jsonify({ - 'status': 'error', - 'message': 'chatbotId is required' - }), 400 + "status": "success" + }) - if not text: + + + + class DeleteBotRequest(BaseModel): + id: str = Field(None, description="The bot's id") + + @app.delete('/bot', summary="", tags=[bot_tag], security=security) + @uses_jwt() + def delete_bot(form: DeleteBotRequest, decoded_jwt, user): + """ + Deletes a chatbot via it's id + """ + bot = Chatbot.get(id=form.id) + bot.delete() return jsonify({ - 'status': 'error', - 'message': 'No data source found' - }), 400 - - train_text(bot_id, text) - return jsonify({ - "status": "success" - }) - -#-------- non api routes ------------- - -@app.route("/") #Index Verzeichnis -def index(): - return send_from_directory('./public', "index.html") - - -@app.route('/') #generische Route (auch Unterordner) -def catchAll(path): - return send_from_directory('./public', path) - - -#import logging_loki + "status": "success" + }) -def create_app(): - LOG_LEVEL = os.getenv("LOG_LEVEL") - if LOG_LEVEL: - logging.basicConfig(level=eval("logging." + LOG_LEVEL)) - else: - logging.basicConfig(level=logging.WARN) + #============================================================================ + + + class AskBotRequest(BaseModel): + bot_id: str = Field(None, description="The bot's id") + question: str = Field(None, description="The question the bot should answer") + + + @app.get('/bot/ask', summary="", tags=[bot_tag], security=security) + @uses_jwt() + def query_bot(query: AskBotRequest, decoded_jwt, user): + """ + Asks a chatbot + """ + + bot_id = query.bot_id + question = query.question + + start = datetime.now().timestamp() + + d = ask_bot(system_prompt + " " + question, bot_id) + + def get_scores(*args): + score_docs = d["get_score_docs"]() + return score_docs + + + def do_streaming(*args): + start_stream = datetime.now().timestamp() + answer = "" + for chunk in d["answer_generator"](): + answer += chunk + + stream_duration = round(datetime.now().timestamp() - start_stream, 2) + logger.info(f"Stream duration: {stream_duration}") + return answer + + + async def f(): + ls = await asyncio.gather( + asyncio.to_thread(get_scores, 1,2,3), + asyncio.to_thread(do_streaming, 1,2,3) + ) + return ls + + [score_docs, answer] = asyncio.run(f()) + + duration = round(datetime.now().timestamp() - start, 2) + + logger.info("Total duration: " + str(duration)) + + return jsonify({ + "answer": answer, + "duration": str(duration), + #"docs": ls#, + "score_docs": xs + }) + + + + + #-----------------Embedding---------------------- + + class TrainTextRequest(BaseModel): + bot_id: str = Field(None, description="The bot's id") + text: str = Field(None, description="Some text") + + + @app.post('/bot/train/text', summary="", tags=[bot_tag], security=security) + @uses_jwt() + def upload(form: TrainTextRequest, decoded_jwt, user): + """ + Caution: Long running request! + """ + bot_id = form.bot_id + text = form.text + + # validate body + if not bot_id: + return jsonify({ + 'status': 'error', + 'message': 'chatbotId is required' + }), 400 + + if not text: + return jsonify({ + 'status': 'error', + 'message': 'No data source found' + }), 400 + + + with MeasureTime("Training on text", logger) as _: + train_text(bot_id, text) + + return jsonify({ + "status": "success" + }) + + #-------- non api routes ------------- + + @app.route("/") #Index Verzeichnis + def index(): + return send_from_directory('./public', "index.html") + + + @app.route("/info") + def debug_info(): + import multiprocessing + + def get_module_versions(): + with open("requirements.txt", "r") as f: + modules = {line.split("#")[0] for line in f.read().split("\n") if line.split("#")[0] != ""} + with open("current_requirements.txt", "r") as f: + d = {k: v for (k, v) in [line.split("#")[0].split("==") for line in f.read().split("\n") if line.split("#")[0] != ""]} + return {k: v for k, v in d.items() if k in modules} + + d = {} + d["module_versions"] = get_module_versions() + d["OLLAMA_NUM_PARALLEL"] = os.getenv("OLLAMA_NUM_PARALLEL") + d["OLLAMA_MAX_LOADED_MODELS"] = os.getenv("OLLAMA_MAX_LOADED_MODELS") + d["cpus"] = multiprocessing.cpu_count() + + #return "CPUs: " + str(cpus) + "
" + json.dumps(get_module_versions(), indent=4).replace("\n", "
") + + return json.dumps(d, indent=4).replace("\n", "
") + + + + + @app.route('/') #generische Route (auch Unterordner) + def catchAll(path): + logger.info(f"Path requested: '{path}'") + return send_from_directory('./public', path) + + + + lvl = get_log_level() + #print(f"Log level: {lvl}", flush=True) + + logger.addHandler(ElasticsearchLogHandler(level=logging.INFO)) + logger.setLevel(lvl) - #TODO: implement some kind of logging mechanism download_llm("llama3") connections.create_connection(hosts=elastic_url, request_timeout=60) wait_for_elasticsearch() init_indicies() create_default_users() - + return app + if __name__ == '__main__': app = create_app() - app.run(debug=False, host='0.0.0.0') + app.run(debug=True, host='0.0.0.0') diff --git a/backend/lib/chatbot.py b/backend/lib/chatbot.py index 03824f9..5bb82fd 100644 --- a/backend/lib/chatbot.py +++ b/backend/lib/chatbot.py @@ -87,26 +87,11 @@ def train_text(bot_id, text): -#TODO add history + def ask_bot(question, bot_id): - bot = Chatbot.get(id=bot_id) - llm = Ollama( - model=bot.llm_model, - base_url=ollama_url - ) - query = bot.system_prompt + " " + question - for chunk in llm.stream(query): - yield chunk - - - -#connections.get_connection() -#if es.indices.exists(index="index"): - -def ask_bot2(question, bot_id): """ - Asks a chatbot + Asks a chatbot using RAG resources """ bot = Chatbot.get(id=bot_id) @@ -159,7 +144,6 @@ def ask_bot2(question, bot_id): xs.append([score, dict(doc)]) return xs - return { "answer_generator": gen_func, "get_score_docs": get_score_docs diff --git a/backend/lib/knowledge.py b/backend/lib/knowledge.py new file mode 100644 index 0000000..5c61659 --- /dev/null +++ b/backend/lib/knowledge.py @@ -0,0 +1,38 @@ + + + + +from neo4j import GraphDatabase + +""" +pwd = "neo4j2" +proto = "bolt" +host = "192.168.99.101" + +driver = GraphDatabase.driver("%s://%s:7687" % (proto, host), auth=("neo4j", pwd), encrypted=False) + +def add_friend(tx, name, friend_name): + tx.run("MERGE (a:Person {name: $name}) " + "MERGE (a)-[:KNOWS]->(friend:Person {name: $friend_name})", + name=name, friend_name=friend_name) + +def print_friends(tx, name): + for record in tx.run("MATCH (a:Person)-[:KNOWS]->(friend) WHERE a.name = $name " + "RETURN friend.name ORDER BY friend.name", name=name): + print(record["friend.name"]) + +with driver.session() as session: + session.write_transaction(add_friend, "Arthur", "Guinevere") + session.write_transaction(add_friend, "Arthur", "Lancelot") + session.write_transaction(add_friend, "Arthur", "Merlin") + session.read_transaction(print_friends, "Arthur") + +driver.close() +""" + + + + + + + diff --git a/backend/lib/logging.py b/backend/lib/logging.py new file mode 100644 index 0000000..496235b --- /dev/null +++ b/backend/lib/logging.py @@ -0,0 +1,68 @@ +import logging +import os +from datetime import datetime, date +from lib.models import LogEntry + + + + +class ElasticsearchLogHandler(logging.Handler): + + def __init__(self, level): + logging.Handler.__init__(self=self) + #super().__init__(self=self) + self.setLevel(level) + + def emit(self, record): + + #print(str(record.__dict__), flush=True) + + #{'name': 'werkzeug', + # 'msg': '192.168.64.1 - - [07/Sep/2024 11:43:23] "%s" %s %s', + # 'args': ('GET /socket.io/?EIO=4&transport=websocket&sid=MtyTmZQs5IA6DnvhAAAA HTTP/1.1', '200', '-'), + # 'levelname': 'INFO', + # 'levelno': 20, + # 'pathname': '/usr/local/lib/python3.12/dist-packages/werkzeug/_internal.py', + # 'filename': '_internal.py', + # 'module': '_internal', + # 'exc_info': None, + # 'exc_text': None, + # 'stack_info': None, + # 'lineno': 97, + # 'funcName': '_log', + # 'created': 1725709403.1972203, + # 'msecs': 197.0, + # 'relativeCreated': 37105.026721954346, + # 'thread': 133472930760384, + # 'threadName': 'Thread-15 (process_request_thread)', + # 'processName': 'MainProcess', + # 'process': 26, + # 'taskName': None} + + + entry = LogEntry( + message = record.msg, + level = record.levelname, #record.levelno, + creation_time = datetime.now(), + + name = record.name, + pathname = record.pathname, + filename = record.filename, + module = record.module, + lineno = record.lineno, + funcName = record.funcName, + threadName = record.threadName, + processName = record.processName + ) + entry.save() + + +def get_log_level(default=logging.WARN): + LOG_LEVEL = os.getenv("LOG_LEVEL") + if LOG_LEVEL: + return eval("logging." + LOG_LEVEL) + return default + + + + diff --git a/backend/lib/models.py b/backend/lib/models.py index 0e8062e..921824d 100644 --- a/backend/lib/models.py +++ b/backend/lib/models.py @@ -1,7 +1,6 @@ import os from elasticsearch_dsl import Document, InnerDoc, Nested, Date, Integer, Keyword, Float, Long, Text, connections, Object, Boolean - class User(Document): creation_date = Date() email = Keyword() @@ -29,7 +28,6 @@ class User(Document): return super(User, self).save(**kwargs) - class Chatbot(Document): creation_date = Date() changed_date = Date() @@ -63,11 +61,6 @@ class Chatbot(Document): return super(Chatbot, self).save(**kwargs) - - - - - class Text(Document): creation_date = Date() creator_id = Keyword() @@ -84,62 +77,101 @@ class Text(Document): return super(Text, self).save(**kwargs) - -#======= Query Log =========== - - -class Sources(InnerDoc): - score = Float() - #sourceFileId = Text() - sourceType = Text() - tags = Text() - - #new fields - sourceFileId = Keyword() - filename = Keyword() - url = Keyword() - txt_id = Keyword() - page = Integer() - - - -class QueryLog(Document): - answer = Text() - question = Text() - - chatbotid = Keyword() - durasecs = Float() - #inCt = Float() - inToks = Long() - llm = Text() - #outCt = Float() - outToks = Long() - - #queryid = Keyword() - #rating = Long() - #reason = Text() - #reasontags = Text() - session = Keyword() - - sources = Object(Sources) - temperature = Float() - #totalCt = Float() - - timest = Date() #timestamp - date = Date() #iso date +class Question(Document): + question = Text(index=False, required=True) + md5 = Keyword() class Index: - name = 'query_log' + name = 'question' settings = { "number_of_shards": 1, } def save(self, ** kwargs): - return super(QueryLog, self).save(**kwargs) + return super(Question, self).save(**kwargs) +class Answer(Document): + question_id = Keyword() + answer = Text(index=False, required=True) + md5 = Keyword() + + class Index: + name = 'answer' + settings = { + "number_of_shards": 1, + } + + def save(self, ** kwargs): + return super(Answer, self).save(**kwargs) + + +class LogEntry(Document): + message = Text(index=False, required=True) + level = Keyword() #Integer(required=True) + creation_time = Date() + + + name = Keyword() + + # 'args': ('GET /socket.io/?EIO=4&transport=websocket&sid=MtyTmZQs5IA6DnvhAAAA HTTP/1.1', '200', '-'), + + pathname = Keyword() + # 'pathname': '/usr/local/lib/python3.12/dist-packages/werkzeug/_internal.py', + + filename = Keyword() + # 'filename': '_internal.py', + + module = Keyword() + # 'module': '_internal', + + lineno = Integer(required=True) + # 'lineno': 97, + + funcName = Keyword() + # 'funcName': '_log', + + + # 'created': 1725709403.1972203, + # 'msecs': 197.0, + + threadName = Keyword() + # 'threadName': 'Thread-15 (process_request_thread)', + + processName = Keyword() + # 'processName': 'MainProcess', + + + + + + class Index: + name = 'logentry' + settings = { + "number_of_shards": 1, + } + + def save(self, ** kwargs): + return super(LogEntry, self).save(**kwargs) + + +#======= Query Log =========== + + +#class Sources(InnerDoc): +#score = Float() +#tags = Text() +#filename = Keyword() +#page = Integer() + + +#---------------------------------------------- def init_indicies(): - # create the mappings in elasticsearch - for Index in [QueryLog, Chatbot, User, Text]: + """ + Create the mappings in elasticsearch + """ + for Index in [LogEntry, Question, Answer, Chatbot, User, Text]: Index.init() + + diff --git a/deployment/docker-compose.yml b/deployment/docker-compose.yml index 90dfb31..406fad2 100644 --- a/deployment/docker-compose.yml +++ b/deployment/docker-compose.yml @@ -32,6 +32,31 @@ services: - MINIO_DEFAULT_BUCKETS=defaultbucket command: server --console-address ":29001" /data + neo4j: + container_name: ${APP_PREFIX}_neo4j + image: neo4j + #image: neo4j:3.5 + #image: neo4j:4.1 + restart: unless-stopped + ports: + - 7474:7474 + - 7687:7687 + volumes: + - ./conf:/conf + - ./data:/data + - ./import:/import + - ./logs:/logs + - ./plugins:/plugins + environment: + - NEO4J_AUTH=neo4j/your_password + + # Raise memory limits + - NEO4J_server_memory_pagecache_size=512M + - NEO4J_server_memory_heap_max__size=512M + + - dbms.usage_report.enabled=false + + elasticsearch: container_name: ${APP_PREFIX}_elasticsearch image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0 @@ -64,6 +89,10 @@ services: restart: always ports: - "11434:11434" + environment: + - OLLAMA_NUM_PARALLEL=4 + - OLLAMA_MAX_LOADED_MODELS=4 + volumes: - ..:/code - ../ollama/ollama:/root/.ollama @@ -75,6 +104,12 @@ services: - /dev/dri + + + + + + #ollama-webui: # container_name: ${APP_PREFIX}_ollama-webui # image: ghcr.io/ollama-webui/ollama-webui:main diff --git a/deployment/sample.env b/deployment/sample.env index 14f0506..211c979 100644 --- a/deployment/sample.env +++ b/deployment/sample.env @@ -5,6 +5,7 @@ APP_PREFIX=creative_bots DEFAULT_USERS=[["user@mail.net", "12345", "user"], ["admin@mail.net", "12345", "admin"]] #JWT encryption secret: -SECRET=1234 - +SECRET=23A344F670E +#WARN INFO FATAL +LOG_LEVEL=WARN