java - swing html css




Кафка: написание кастомного сериализатора (2)

Я пытаюсь построить POC с Kafka 0.8.1. Я использую свой собственный класс Java в качестве сообщения Kafka, которое имеет кучу типов данных String. Я не могу использовать класс сериализатора по умолчанию или класс сериализатора String, который поставляется с библиотекой Kafka. Я думаю, мне нужно написать свой собственный сериализатор и передать его в свойствах производителя. Если вам известно о написании примера настраиваемого сериализатора в Kafka (на Java), пожалуйста, поделитесь. Ценю большое, большое спасибо.


Вам нужно реализовать как кодировать, так и декодировать

public class JsonEncoder implements Encoder<Object> {
        private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);

        public JsonEncoder(VerifiableProperties verifiableProperties) {
            /* This constructor must be present for successful compile. */
        }

        @Override
        public byte[] toBytes(Object object) {
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                return objectMapper.writeValueAsString(object).getBytes();
            } catch (JsonProcessingException e) {
                LOGGER.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
            }
            return "".getBytes();
        }
    }

Код декодера

public class JsonDecoder  implements Decoder<Object> {
    private static final Logger LOGGER = Logger.getLogger(JsonEncoder.class);
    public JsonDecoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public Object fromBytes(byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.readValue(bytes, Map.class);
        } catch (IOException e) {
            LOGGER.error(String.format("Json processing failed for object: %s", bytes.toString()), e);
        }
        return null;
    }
}

Запись пом

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.4.1.3</version>
</dependency>

Установите кодировщик по умолчанию в свойстве Kafka

properties.put("serializer.class","kafka.serializer.DefaultEncoder");

Код писателя и читателя выглядит следующим образом

byte[] bytes = encoder.toBytes(map);
        KeyedMessage<String, byte[]> message =new KeyedMessage<String, byte[]>(this.topic, bytes);

JsonDecoder decoder = new JsonDecoder(null);
Map map = (Map) decoder.fromBytes(it.next().message());

Вещи, необходимые для написания собственного сериализатора:

  1. Реализация Encoder с объектом, указанным для универсального
    • Требуется наличие конструктора VerifiableProperties
  2. Переопределите toBytes(...) чтобы убедиться, что возвращается байтовый массив
  3. Вставьте класс сериализатора в ProducerConfig

Объявление собственного сериализатора для производителя

Как вы отметили в своем вопросе, Kafka предоставляет средства для объявления конкретного сериализатора для производителя. Класс сериализатора устанавливается в экземпляре ProducerConfig и этот экземпляр используется для создания требуемого класса Producer .

Если вы будете следовать примеру производителя Kafka, вы создадите ProducerConfig через объект Properties . При создании файла свойств обязательно укажите:

props.put("serializer.class", "path.to.your.CustomSerializer");

Путь к классу, который вы хотите, чтобы Kafka использовал для сериализации сообщений перед добавлением их в журнал.

Создание собственного сериализатора, который понимает Кафка

Написание собственного сериализатора, который Kafka может правильно интерпретировать, требует реализации класса Scala Encoder[T] который предоставляет Kafka. Реализация черт в Java - это странно , но следующий метод работал для сериализации JSON в моем проекте:

public class JsonEncoder implements Encoder<Object> {
    private static final Logger logger = Logger.getLogger(JsonEncoder.class);
    // instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public JsonEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(Object object) {
        try {
            return objectMapper.writeValueAsString(object).getBytes();
        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }
        return "".getBytes();
    }
}

Ваш вопрос звучит так, как будто вы используете один объект (давайте назовем его CustomMessage ) для всех сообщений, добавляемых в ваш журнал. Если это так, ваш сериализатор может выглядеть примерно так:

package com.project.serializer;

public class CustomMessageEncoder implements Encoder<CustomMessage> {
    public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(CustomMessage customMessage) {
        return customMessage.toBytes();
    }
}

Что бы оставить конфигурацию вашего свойства выглядеть следующим образом:

props.put("serializer.class", "path.to.your.CustomSerializer");




apache-kafka