import logging import azure.functions as func from json import dumps as jsondump, loads as jsonload from dataclasses import dataclass, asdict from typing import Optional @dataclass(frozen=True) class AiKompasMessage: msg_type: int obj_id: Optional[str] @classmethod def fromServiceBusMessage(cls, msg: func.ServiceBusMessage) -> 'AiKompasMessage': s = msg.get_body().decode('utf-8') return AiKompasMessage(**jsonload(s)) def toJson(self) -> str: return jsondump(asdict(self)) @dataclass(frozen=True) class AiKompasDataset: id: int data: dict def main(req: func.HttpRequest, msg: func.Out[str], doc: func.Out[func.Document]) -> func.HttpResponse: logging.info('[CreateDataset] Starting') data = req.params.get('data') if not data: try: req_body = req.get_json() except ValueError: pass else: data = req_body.get('data') if data: newdoc = func.Document.from_dict(dict(data=data)) doc.set(newdoc) s = AiKompasMessage(msg_type=1, obj_id=newdoc.get('id')) msg.set(s.toJson()) return func.HttpResponse(jsondump(data)) else: return func.HttpResponse( "This HTTP triggered function executed successfully. Pass data in the query string or in the request body for a personalized response.", status_code=200 )