Skip to content

Avro #22

@aeldaly-tc

Description

@aeldaly-tc

Hello. Thanks so much for this nice project.

I was wondering what's the best way to use avro with this project?

Here's what I did:

async def produce_to_kafka_connect(params: CreateSearchParams):
    system_name = params.system_name
    topic = KafkaTopicsEnum.find_by_system_name(system_name)
    data = CreateSearchDocumentSchema.parse_obj(params.dict()).to_avro()

    await KAFKA_APP.publish(topic.value, data=data)
from typing import Any, Dict, List

from avro.schema import Schema, parse

from .model import BaseModel


class SchemaField(BaseModel):
    name: str
    type: Any


class BaseSchema(BaseModel):
    namespace: str = "ss.message_type.avro"
    type = "record"

    name: str
    fields_list: List[SchemaField]

    def json(
        self, *args: List[Any], **kwargs: Dict[str, Any]
    ):  # pyright: reportIncompatibleMethodOverride=false
        return super().json(*args, **kwargs).replace("fields_list", "fields")

    def to_avro(self) -> Schema:
        return parse(self.json())

the problem is publish assumes that data is a Pydantic model

    async def publish(
        self, stream_id: str, data: BaseModel, key: Optional[bytes] = None
    ) -> Awaitable[aiokafka.structs.ConsumerRecord]:
        if not self._intialized:
            async with self.get_lock("_"):
                await self.initialize()

        schema_key = getattr(data, "__key__", None)

Any suggestions on what we can do here?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions