diff --git a/backend/app.py b/backend/app.py index 525a4a3..b32b693 100644 --- a/backend/app.py +++ b/backend/app.py @@ -7,28 +7,23 @@ OpenAPI access via http://localhost:5000/openapi/ on local docker-compose deploy #warnings.filterwarnings("ignore") #std lib modules: -import os, sys, json +import os, sys, json, time from typing import Any, Tuple, List, Dict, Any, Callable, Optional from datetime import datetime, date from collections import namedtuple import hashlib, traceback, logging +from functools import wraps #llm from langchain.callbacks.manager import CallbackManager from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler from langchain_community.llms import Ollama -#import openai #even used? import tiktoken from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.chains import RetrievalQA -#from langchain.callbacks import get_openai_callback -#from langchain_community.callbacks import get_openai_callback -#from langchain_openai import ChatOpenAI, AzureChatOpenAI -#from langchain_openai import OpenAIEmbeddings, AzureOpenAIEmbeddings from langchain_community.vectorstores.elasticsearch import ElasticsearchStore -#from langchain.document_loaders import PyPDFLoader, Docx2txtLoader from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader from langchain.callbacks.base import BaseCallbackHandler, BaseCallbackManager @@ -36,11 +31,15 @@ from langchain.prompts import PromptTemplate #ext libs from elasticsearch import NotFoundError, Elasticsearch # for normal read/write without vectors -from elasticsearch_dsl import Search, A -from elasticsearch_dsl import Document, Date, Integer, Keyword, Float, Long, Text, connections +from elasticsearch_dsl import Search, A, Document, Date, Integer, Keyword, Float, Long, Text, connections +from elasticsearch.exceptions import ConnectionError from pydantic import BaseModel, Field +import logging_loki +import jwt as pyjwt + + #flask, openapi from flask import Flask, send_from_directory, Response, request, jsonify import sys, os @@ -50,20 +49,159 @@ from flask_openapi3 import Info, Tag, OpenAPI, Server, FileStorage from flask_socketio import SocketIO, join_room, leave_room, rooms, send -#home grown +import base64 +import os +from cryptography.fernet import Fernet +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + +#----------home grown-------------- #from scraper import WebScraper from funcs import group_by -#from elastictools import update_by_id, delete_by_id +from elastictools import get_by_id, update_by_id, delete_by_id +from models import QueryLog, Chatbot, User -#TODO: implement some kind of logging mechanism -#logging.basicConfig(filename='record.log', level=logging.DEBUG) -#logging.basicConfig(level=logging.DEBUG) -logging.basicConfig(level=logging.WARN) -app = Flask(__name__) -from flask_cors import CORS #falls cross-orgin verwendet werden soll -CORS(app) +#LLM_PAYLOAD = int(os.getenv("LLM_PAYLOAD")) +#CHUNK_SIZE = int(os.getenv("CHUNK_SIZE")) +BOT_ROOT_PATH = os.getenv("BOT_ROOT_PATH") + + +# JWT Bearer Sample +jwt = { + "type": "http", + "scheme": "bearer", + "bearerFormat": "JWT" +} +security_schemes = {"jwt": jwt} +security = [{"jwt": []}] + + +info = Info( + title="Chatbot-API", + version="1.0.0", + summary="The REST-API", + description="Default model: ..." +) +servers = [ + Server(url=BOT_ROOT_PATH ) +] + +class NotFoundResponse(BaseModel): + code: int = Field(-1, description="Status Code") + message: str = Field("Resource not found!", description="Exception Information") + +app = OpenAPI( + __name__, + info=info, + servers=servers, + responses={404: NotFoundResponse}, + security_schemes=security_schemes + ) + + +def jwt_required(f): + """ + Wraps routes in a jwt-required logic and passes decoded jwt and user from elasticsearch to the route as keyword + """ + + @wraps(f) + def decorated_route(*args, **kwargs): + token = None + if "Authorization" in request.headers: + token = request.headers["Authorization"].split(" ")[1] + if not token: + return jsonify({ + 'status': 'error', + "message": "Authentication Token is missing!", + }), 401 + + try: + data = pyjwt.decode(token, app.config["jwt_secret"], algorithms=["HS256"]) + except Exception as e: + return jsonify({ + 'status': 'error', + "message": "JWT-decryption: " + str(e) + }), 401 + + try: + #user = get_by_id(client, index="user", id_field_name="email", id_value=data["email"])[0] + response = Search(using=client, index="user").filter("term", **{"email": data["email"]})[0:5].execute() + for hit in response: + user = hit + break + + except Exception as e: + return jsonify({ + 'status': 'error', + "message": "Invalid Authentication token!" + }), 401 + + kwargs["decoded_jwt"] = data + kwargs["user"] = user + return f(*args, **kwargs) + + return decorated_route + + + + + + + + +def create_key(salt: str, user_email: str) -> Fernet: + """ + Example salt: 9c46f833b3376c5f3b64d8a93951df4b + Fernet usage: token = f.encrypt(b"Secret message!") + """ + salt_bstr = bytes(salt, "utf-8") + email_bstr = bytes(user_email, "utf-8") + #password = b"password" + #salt = os.urandom(16) + #salt = b"9c46f833b3376c5f3b64d8a93951df4b" + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt_bstr, + iterations=48, + ) + key = base64.urlsafe_b64encode(kdf.derive(email_bstr)) + return Fernet(key) + + + + +#app = Flask(__name__) + + +app.config['UPLOAD_FOLDER'] = 'uploads' +app.config['CORS_HEADERS'] = 'Content-Type' +app.config['CORS_METHODS'] = ["GET,POST,OPTIONS,DELETE,PUT"] + + +env_to_conf = { + "BACKEND_INTERNAL_URL": "api_url", + "ELASTIC_URI": "elastic_uri", + "SECRET": "jwt_secret" +} + +#import values from env into flask config and do existence check +for env_key, conf_key in env_to_conf.items(): + x = os.getenv(env_key) + if not x: + msg = "Environment variable '%s' not set!" % env_key + app.logger.fatal(msg) + sys.exit(1) + else: + app.config[conf_key] = x + + + + +#from flask_cors import CORS #falls cross-orgin verwendet werden soll +#CORS(app) socket = SocketIO(app, cors_allowed_origins="*") @@ -74,7 +212,7 @@ def sockcon(data): to avoid broadcasting messages answer in callback only to room with sid """ - room = request.sid + room = request.sid + request.remote_addr join_room(room) socket.emit('backend response', {'msg': f'Connected to room {room} !', "room": room}) # looks like iOS needs an answer @@ -112,16 +250,129 @@ def handle_message(message): +def create_embedding(): + pass + + +def hash_password(s: str) -> str: + return hashlib.md5(s.encode('utf-8')).hexdigest() + + +#======================= TAGS ============================= + +jwt_tag = Tag(name='JWT', description='Requires a valid JSON Web Token') +not_implemented_tag = Tag(name='Not implemented', description='Functionality not yet implemented beyond an empty response') #==============Routes=============== +class LoginRequest(BaseModel): + email: str = Field(None, description='A short text by the user explaining the rating.') + password: str = Field(None, description='A short text by the user explaining the rating.') + + +@app.post('/login', summary="", tags=[], security=security) +def login(form: LoginRequest): + """ + Get your JWT to verify access rights + """ + client = Elasticsearch(app.config['elastic_uri']) + match get_by_id(client, index="user", id_field_name="email", id_value=form.email): + case []: + msg = "User with email '%s' doesn't exist!" % form.email + app.logger.error(msg) + return jsonify({ + 'status': 'error', + 'message': msg + }), 400 + + case [user]: + if user["password_hash"] == hash_password(form.password): + return pyjwt.encode({"email": form.email}, app.config['jwt_secret'], algorithm="HS256") + else: + msg = "Invalid password!" + app.logger.error(msg) + return jsonify({ + 'status': 'error', + 'message': msg + }), 400 + + + +class IndexSchemaRequest(BaseModel): + #end: datetime = Field("2100-01-31T16:47+00:00", description="""The interval end datetime in ISO 8601 format""") + pass + + + + + + + +@app.get('/bot', summary="", tags=[jwt_tag], security=security) +@jwt_required +def get_all_bots(decoded_jwt, user): + """ + List all bots for a user identified by the JWT. + """ + #client = Elasticsearch(app.config['elastic_uri']) + #bots = get_by_id(client, index="chatbot", id_field_name="createdBy", id_value=nextsearch_user.meta.id) + #return jsonify(bots) + return jsonify([]) + + + +@app.post('/bot', summary="", tags=[jwt_tag, not_implemented_tag], security=security) +@jwt_required +def create_bot(query: IndexSchemaRequest): + """ + Creates a chatbot for the JWT associated user. + """ + return "" + + + +#======== DEBUG routes ============ + +@app.get('/bot/debug/schema', summary="", tags=[]) +def get_schema(query: IndexSchemaRequest): + """ + + """ + #chatbots = query.chatbots + #client = Elasticsearch(app.config['elastic_uri']) + + def simplify_properties(d): + new_d = {} + for field, d3 in d["properties"].items(): + if "type" in d3: + new_d[field] = d3["type"] + elif "properties" in d3: + new_d[field] = simplify_properties(d3) + return new_d + + + def get_type_schema(client: Elasticsearch): + d = client.indices.get(index="*").body + new_d = {} + for index, d2 in d.items(): + new_d[index] = simplify_properties(d2["mappings"]) + return new_d + + return jsonify( get_type_schema(client) ) + + +#TODO: route that takes a schema json and compares to internal structure and returns boolean + + +#-------- non api routes ------------- + @app.route("/") #Index Verzeichnis def index(): return send_from_directory('.', "index.html") -@app.route("/info") #spezielle Nutzer definierte Route -def info(): - return sys.version+" "+os.getcwd() +#@app.route("/info") #spezielle Nutzer definierte Route +#def info(): +# return sys.version+" "+os.getcwd() @app.route('/') #generische Route (auch Unterordner) def catchAll(path): @@ -129,13 +380,78 @@ def catchAll(path): +def init_indicies(): + # create the mappings in elasticsearch + for Index in [QueryLog, Chatbot, User]: + Index.init() + + +def create_default_users(): + #create default users + client = Elasticsearch(app.config['elastic_uri']) + default_users = os.getenv("DEFAULT_USERS") + if default_users: + for (email, pwd, role) in json.loads(default_users): + if len(get_by_id(client, index="user", id_field_name="email", id_value=email)) == 0: + user = User(email=email, password_hash=hash_password(pwd), role=role) + #user.published_from = datetime.now() + user.save() + + + if __name__ == '__main__': - #Wenn HTTPS benötigt wird (Pfade für RHEL7/können je OS variieren) - #cert = "/etc/pki/tls/certs/cert-payment.pem" #cert - #key = "/etc/pki/tls/private/cert-payment-private.pem" #key - #context = (cert, key) - #app.run(debug=True, host='0.0.0.0', ssl_context=context) - app.run(debug=True, host='0.0.0.0') - #app.run(debug=True) + + #TODO: implement some kind of logging mechanism + #logging.basicConfig(filename='record.log', level=logging.DEBUG) + #logging.basicConfig(level=logging.DEBUG) + logging.basicConfig(level=logging.WARN) + + """ + USE_LOKI_LOGGER = os.getenv("USE_LOKI_LOGGER") + if USE_LOKI_LOGGER: + handler = logging_loki.LokiHandler( + url="http://loki:3100/loki/api/v1/push", + tags={"application": "Nextsearch"}, + #auth=("username", "password"), + version="1", + ) + app.logger.addHandler(handler) + """ + + connections.create_connection(hosts=app.config['elastic_uri']) + + #client = Elasticsearch(app.config['elastic_uri']) + #client = Elasticsearch(hosts=[{"host": "elasticsearch"}], retry_on_timeout=True) + client = Elasticsearch(app.config['elastic_uri'], retry_on_timeout=True) + + + #TODO: find a clean way to wait without exceptions! + #Wait for elasticsearch to start up! + i = 1 + while True: + try: + #client = Elasticsearch(app.config['elastic_uri']) + + client.cluster.health(wait_for_status='yellow') + print("Elasticsearch found! Run Flask-app!", flush=True) + break + except ConnectionError: + i *= 1.5 + time.sleep(i) + print("Elasticsearch not found! Wait %s seconds!" % i, flush=True) + + + # Display cluster health + #app.logger.debug(connections.get_connection().cluster.health()) + + + init_indicies() + create_default_users() + app.run(debug=True, host='0.0.0.0') + + + + + diff --git a/backend/backend.env b/backend/backend.env index e69de29..5266e29 100644 --- a/backend/backend.env +++ b/backend/backend.env @@ -0,0 +1,32 @@ + + +ELASTIC_URI=http://elasticsearch:9200 + +DEFAULT_USERS=[["user@gmail.com", "1234", "user"], ["admin@gmail.com", "1234", "admin"]] + +# DEFAULT_USERS is list of lists, each nested list respectively contains email, password and role +# e.g. [["user@gmail.com", "1234", "user"], ["admin@gmail.com", "1234", "admin"]] +# leave empty if you don't wish to seed users + + +#-----------not used yet---------------- + +#JWT encryption secret: +SECRET=1234 + + +LLM_PAYLOAD=16384 +CHUNK_SIZE=1536 + + +#A flag for using Loki for logging. To deactivate comment it out +USE_LOKI_LOGGER=1 + + + + +BOT_ROOT_PATH=/ + +BACKEND_INTERNAL_URL=http://backend:5000 + + diff --git a/backend/elastictools.py b/backend/elastictools.py index 61400ec..bc3cf05 100644 --- a/backend/elastictools.py +++ b/backend/elastictools.py @@ -4,8 +4,12 @@ Some helper functions to make querying easier from typing import Any, Tuple, List, Dict, Any, Callable, Optional import json from elasticsearch import NotFoundError, Elasticsearch # for normal read/write without vectors -from elasticsearch_dsl import Search, A -from elasticsearch_dsl import Document, Date, Integer, Keyword, Float, Long, Text, connections +from elasticsearch_dsl import Search, A, UpdateByQuery, Document, Date, Integer, Keyword, Float, Long, Text, connections + + +def get_by_id(client: Elasticsearch, index: str, id_field_name: str, id_value: str): + response = Search(using=client, index=index).filter("term", **{id_field_name: id_value})[0:10000].execute() + return [hit.to_dict() for hit in response] def update_by_id(client: Elasticsearch, index: str, id_field_name: str, id_value: str, values_to_set: Dict[str, Any]) -> None: @@ -14,6 +18,7 @@ def update_by_id(client: Elasticsearch, index: str, id_field_name: str, id_value for k, v in values_to_set.items(): source += f"ctx._source.{k} = {json.dumps(v)};" + """ body = { "query": { "term": { @@ -26,6 +31,14 @@ def update_by_id(client: Elasticsearch, index: str, id_field_name: str, id_value } } client.update_by_query(index=index, body=body) + """ + + ubq = UpdateByQuery(using=client, index=index) \ + .query("term", **{id_field_name: id_value}) \ + .script(source=source, lang="painless") + + response = ubq.execute() + return response.success() diff --git a/backend/models.py b/backend/models.py index 5f47c9f..957cf45 100644 --- a/backend/models.py +++ b/backend/models.py @@ -1,84 +1,127 @@ import os -from elasticsearch_dsl import Document, InnerDoc, Date, Integer, Keyword, Float, Long, Text, connections, Object +from elasticsearch_dsl import Document, InnerDoc, Nested, Date, Integer, Keyword, Float, Long, Text, connections, Object, Boolean -# Define a default Elasticsearch client -connections.create_connection(hosts="http://localhost:9200") -class Article(Document): - title = Text(analyzer='snowball', fields={'raw': Keyword()}) - body = Text(analyzer='snowball') - tags = Keyword() - published_from = Date() - lines = Integer() + + +class User(Document): + email = Keyword() + password_hash = Text(index=False) + role = Keyword() + + #salt = Text(index=False) + #profileImage = Text(index=False) + #profileImage = Keyword() + + isEmailVerified = Boolean() + #status = Text() + + #otpExpires = Date() + #resetPasswordToken = Text(index=False) + #mailToken = Text(index=False) class Index: - name = 'blog' - settings = { - "number_of_shards": 1, - } - - def save(self, ** kwargs): - self.lines = len(self.body.split()) - return super(Article, self).save(** kwargs) - - -#======= nextsearch_log =========== - -class Sources(InnerDoc): - score = Float() - sourceFileId = Text() - sourceType = Text() - tags = Text() - -class NextsearchLog(Document): - a = Text() - chatbotid = Keyword() - durasecs = Float() - inCt = Float() - inToks = Long() - llm = Text() - outCt = Float() - outToks = Long() - q = Text() - queryid = Keyword() - rating = Long() - reason = Text() - reasontags = Text() - session = Keyword() - - sources = Object(Sources) #Text(analyzer='snowball') - temperature = Float() - totalCt = Float() - - timest = Date() #timestamp - date = Date() #iso date - - class Index: - #name = 'test_nextsearch_log' - name = 'nextsearch_log' + name = 'user' settings = { "number_of_shards": 1, } def save(self, ** kwargs): - self.lines = len(self.body.split()) - return super(NextsearchLog, self).save(** kwargs) + return super(User, self).save(**kwargs) + + + +class Chatbot(Document): + name = Text() + createdBy = Keyword() + description = Text() + systemPrompt = Text(index=False) + + #slug = Keyword() + files = Nested() + text = Text() + links = Nested() + + #chatbotImage = Text(index=False) + sourceCharacters = Integer() + + #visibility = Keyword() + #status = Keyword() + + temperature = Float() + llm_model = Keyword() + + + class Index: + name = 'chatbot' + settings = { + "number_of_shards": 1, + } + + def save(self, ** kwargs): + return super(Chatbot, self).save(**kwargs) + + + + + +#======= Query Log =========== + + +class Sources(InnerDoc): + score = Float() + #sourceFileId = Text() + sourceType = Text() + tags = Text() + + #new fields + sourceFileId = Keyword() + filename = Keyword() + url = Keyword() + txt_id = Keyword() + page = Integer() + + + +class QueryLog(Document): + answer = Text() + question = Text() + + chatbotid = Keyword() + durasecs = Float() + #inCt = Float() + inToks = Long() + llm = Text() + #outCt = Float() + outToks = Long() + + #queryid = Keyword() + #rating = Long() + #reason = Text() + #reasontags = Text() + session = Keyword() + + sources = Object(Sources) + temperature = Float() + #totalCt = Float() + + timest = Date() #timestamp + date = Date() #iso date + + class Index: + name = 'query_log' + settings = { + "number_of_shards": 1, + } + + def save(self, ** kwargs): + return super(QueryLog, self).save(**kwargs) + if __name__ == "__main__": elastic_uri = os.getenv("ELASTIC_URI") #elastic_uri = "http://localhost:9200" - assert elastic_uri - - # Define a default Elasticsearch client - connections.create_connection(hosts=elastic_uri) - #connections.create_connection(hosts) - - # create the mappings in elasticsearch - NextsearchLog.init() - - # create the mappings in elasticsearch - #Article.init() # create and save and article #article = Article(meta={'id': 42}, title='Hello world!', tags=['test']) diff --git a/backend/requirements.txt b/backend/requirements.txt index 6c71285..4eae149 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -13,3 +13,9 @@ Flask-SocketIO flask-openapi3 minio + +python-logging-loki +pyjwt +cryptography + + diff --git a/docker-compose.yml b/docker-compose.yml index 09d6c44..b7d94c2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,12 +36,14 @@ services: container_name: ${APP_PREFIX}_elasticsearch image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0 restart: always + mem_limit: 4024m ports: - "9200:9200" environment: - discovery.type=single-node - xpack.security.enabled=false - logger.level=ERROR + #- ES_JAVA_OPTS="-Xms2g -Xmx2g" volumes: - esdata:/usr/share/elasticsearch/data ulimits: