Can return null // if the bytes seem invalid.Import faust from faust.types import ModelT from import merge_headers from faust.models import registry class Autodetect ( faust. Schema ): def loads_key ( self, app, message, *, loads = None, serializer = None ): if loads is None : loads = app. loads_key # try to get key_type and serializer from Kafka headers headers = dict ( message. get ( 'KeyType' ) serializer = serializer or headers. get ( 'KeySerializer' ) if key_type_name : key_type = registry return loads ( key_type, message. key, serializer = serializer ) else : return super (). loads_key ( app, message, loads = loads, serializer = serializer ) def loads_value ( self, app, message, *, loads = None, serializer = None ): if loads is None : loads = app. loads_value # try to get key_type and serializer from Kafka headers headers = dict ( message. Get ( 'ValueType' ) serializer = serializer or headers.
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. Archives
December 2022
Categories |