1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
|
# Licensed to Elasticsearch B.V under one or more agreements.
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
# See the LICENSE file in the project root for more information
import aiohttp
import datetime
import os
from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from elasticsearch import AsyncElasticsearch, NotFoundError
from elasticsearch.helpers import async_streaming_bulk
from elasticapm.contrib.starlette import ElasticAPM, make_apm_client
apm = make_apm_client(
{"SERVICE_NAME": "fastapi-app", "SERVER_URL": "http://apm-server:8200"}
)
client = AsyncElasticsearch(os.environ["ELASTICSEARCH_HOSTS"])
app = FastAPI()
app.add_middleware(ElasticAPM, client=apm)
@app.on_event("shutdown")
async def app_shutdown():
await client.close()
async def download_games_db():
async with aiohttp.ClientSession() as http:
url = "https://cdn.thegamesdb.net/json/database-latest.json"
resp = await http.request("GET", url)
for game in (await resp.json())["data"]["games"][:100]:
yield game
@app.get("/")
async def index():
return await client.cluster.health()
@app.get("/ingest")
async def ingest():
if not (await client.indices.exists(index="games")):
await client.indices.create(index="games")
async for _ in async_streaming_bulk(
client=client, index="games", actions=download_games_db()
):
pass
return {"status": "ok"}
@app.get("/search/{query}")
async def search(query):
return await client.search(
index="games", body={"query": {"multi_match": {"query": query}}}
)
@app.get("/delete")
async def delete():
return await client.delete_by_query(index="games", body={"query": {"match_all": {}}})
@app.get("/delete/{id}")
async def delete_id(id):
try:
return await client.delete(index="games", id=id)
except NotFoundError as e:
return e.info, 404
@app.get("/update")
async def update():
response = []
docs = await client.search(
index="games", body={"query": {"multi_match": {"query": ""}}}
)
now = datetime.datetime.utcnow()
for doc in docs["hits"]["hits"]:
response.append(
await client.update(
index="games", id=doc["_id"], body={"doc": {"modified": now}}
)
)
return jsonable_encoder(response)
@app.get("/error")
async def error():
try:
await client.delete(index="games", id="somerandomid")
except NotFoundError as e:
return e.info
@app.get("/doc/{id}")
async def get_doc(id):
return await client.get(index="games", id=id)
|