58 lines
1.5 KiB
Python
58 lines
1.5 KiB
Python
import logging
|
|
|
|
import azure.functions as func
|
|
|
|
from json import dumps as jsondump, loads as jsonload
|
|
from dataclasses import dataclass, asdict
|
|
from typing import Optional
|
|
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AiKompasMessage:
|
|
msg_type: int
|
|
obj_id: Optional[str]
|
|
|
|
|
|
@classmethod
|
|
def fromServiceBusMessage(cls, msg: func.ServiceBusMessage) -> 'AiKompasMessage':
|
|
s = msg.get_body().decode('utf-8')
|
|
|
|
return AiKompasMessage(**jsonload(s))
|
|
|
|
|
|
def toJson(self) -> str:
|
|
return jsondump(asdict(self))
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AiKompasDataset:
|
|
id: int
|
|
data: dict
|
|
|
|
|
|
|
|
def main(req: func.HttpRequest, msg: func.Out[str], doc: func.Out[func.Document]) -> func.HttpResponse:
|
|
logging.info('[CreateDataset] Starting')
|
|
|
|
data = req.params.get('data')
|
|
if not data:
|
|
try:
|
|
req_body = req.get_json()
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
data = req_body.get('data')
|
|
|
|
if data:
|
|
newdoc = func.Document.from_dict(dict(data=data))
|
|
doc.set(newdoc)
|
|
s = AiKompasMessage(msg_type=1, obj_id=newdoc.get('id'))
|
|
msg.set(s.toJson())
|
|
return func.HttpResponse(jsondump(data))
|
|
else:
|
|
return func.HttpResponse(
|
|
"This HTTP triggered function executed successfully. Pass data in the query string or in the request body for a personalized response.",
|
|
status_code=200
|
|
)
|