CreativeBots/backend/lib/elastictools.py
Tobias Weise fc96b462a3
All checks were successful
Gitea Docker Redeploy / Redploy-App-on-self-via-SSH (push) Successful in 9m5s
trying to install amd support
2024-09-02 16:35:15 +02:00

86 lines
2.7 KiB
Python

"""
Some helper functions to make querying easier
"""
import time, json, os
from typing import Any, Tuple, List, Dict, Any, Callable, Optional
from elasticsearch import NotFoundError, Elasticsearch # for normal read/write without vectors
from elasticsearch_dsl import Search, A, UpdateByQuery, Document, Date, Integer, Keyword, Float, Long, Text, connections
from elasticsearch.exceptions import ConnectionError
def get_by_id(index: str, id_field_name: str, id_value: str):
client = connections.get_connection()
response = Search(using=client, index=index).filter("term", **{id_field_name: id_value})[0:10000].execute()
return [hit.to_dict() for hit in response]
def update_by_id(index: str, id_field_name: str, id_value: str, values_to_set: Dict[str, Any]) -> None:
client = connections.get_connection()
#create painless insert script
source = ""
for k, v in values_to_set.items():
source += f"ctx._source.{k} = {json.dumps(v)};"
ubq = UpdateByQuery(using=client, index=index) \
.query("term", **{id_field_name: id_value}) \
.script(source=source, lang="painless")
response = ubq.execute()
return response.success()
def delete_by_id(index: str, id_field_name: str, id_value: str):
client = connections.get_connection()
s = Search(using=client, index=index).filter("term", **{id_field_name: id_value})
response = s.delete()
#if not response.success():
# raise Exception("Unable to delete id '%s' in index '%' !" % (index, id_value))
print(response, flush=True)
def get_datetime_interval(search: Search, start, end) -> Search:
return search.filter("range", timest={"gte": start}).filter("range", timest={"lte": end})
#schema intro spection and maybe comparison/diffing
#TODO: route that takes a schema json and compares to internal structure and returns boolean
def simplify_properties(d):
new_d = {}
for field, d3 in d["properties"].items():
if "type" in d3:
new_d[field] = d3["type"]
elif "properties" in d3:
new_d[field] = simplify_properties(d3)
return new_d
def get_type_schema():
client = connections.get_connection()
d = client.indices.get(index="*").body
new_d = {}
for index, d2 in d.items():
new_d[index] = simplify_properties(d2["mappings"])
return new_d
def wait_for_elasticsearch():
i = 1
while True:
try:
client = connections.get_connection()
client.indices.get_alias(index="*")
print("Elasticsearch found! Run Flask-app!", flush=True)
return
except ConnectionError:
i *= 2 #1.5
time.sleep(i)
print("Elasticsearch not found! Wait %s seconds!" % i, flush=True)