Source code for nuclio_sdk.event
# Copyright 2017 The Nuclio Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import base64
import json
import datetime
class TriggerInfo(object):
def __init__(self, klass='', kind=''):
self.klass = klass
self.kind = kind
[docs]class Event(object):
def __init__(self,
body=None,
content_type=None,
trigger=None,
fields=None,
headers=None,
_id=None,
method=None,
path=None,
size=None,
timestamp=None,
url=None,
_type=None,
type_version=None,
version=None):
self.body = body
self.content_type = content_type
self.trigger = trigger or TriggerInfo(klass='', kind='')
self.fields = fields or {}
self.headers = headers or {}
self.id = _id
self.method = method
self.path = path or '/'
self.size = size
self.timestamp = timestamp or 0
self.url = url
self.type = _type
self.type_version = type_version
self.version = version
def to_json(self):
obj = vars(self).copy()
obj['trigger'] = {
'class': self.trigger.klass,
'kind': self.trigger.kind,
}
return json.dumps(obj)
def get_header(self, header_key):
for key, value in self.headers.items():
if key.lower() == header_key.lower():
return value
[docs] @staticmethod
def from_json(data):
"""Decode event encoded as JSON by processor"""
parsed_data = json.loads(data)
trigger = TriggerInfo(
parsed_data['trigger']['class'],
parsed_data['trigger']['kind'],
)
# extract content type, needed to decode body
content_type = parsed_data['content_type']
return Event(body=Event.decode_body(parsed_data['body'], content_type),
content_type=content_type,
trigger=trigger,
fields=parsed_data.get('fields'),
headers=parsed_data.get('headers'),
_id=parsed_data['id'],
method=parsed_data['method'],
path=parsed_data['path'],
size=parsed_data['size'],
timestamp=datetime.datetime.utcfromtimestamp(parsed_data['timestamp']),
url=parsed_data['url'],
_type=parsed_data['type'],
type_version=parsed_data['type_version'],
version=parsed_data['version'])
[docs] @staticmethod
def decode_body(body, content_type):
"""Decode event body"""
if isinstance(body, dict):
return body
else:
try:
decoded_body = base64.b64decode(body)
except:
return body
if content_type == 'application/json':
try:
return json.loads(decoded_body)
except:
pass
return decoded_body
def __repr__(self):
return self.to_json()