Projects
Eulaceura:Mainline
canal
_service:obs_scm:backport-fixed-issue-#4923-sup...
Sign Up
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File _service:obs_scm:backport-fixed-issue-#4923-support-fastJSON-Feature.LargeObject.patch of Package canal
From 163f71997142919ee1034e668b3bbe14c6e3e83f Mon Sep 17 00:00:00 2001 From: "jianghang.loujh" <jianghang.loujh@alibaba-inc.com> Date: Fri, 3 Nov 2023 12:39:53 +0800 Subject: [PATCH] fixed issue #4923 , support fastJSON Feature.LargeObject --- .../connector/kafka/producer/CanalKafkaProducer.java | 8 +++++--- .../pulsarmq/producer/CanalPulsarMQProducer.java | 3 ++- .../rabbitmq/producer/CanalRabbitMQProducer.java | 3 ++- .../rocketmq/producer/CanalRocketMQProducer.java | 4 ++-- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java b/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java index 9fdd342f58..0300cd13ef 100644 --- a/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java +++ b/connector/kafka-connector/src/main/java/com/alibaba/otter/canal/connector/kafka/producer/CanalKafkaProducer.java @@ -9,7 +9,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import com.alibaba.otter.canal.common.utils.PropertiesUtils; import org.apache.commons.lang.StringUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -21,6 +20,7 @@ import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONWriter; import com.alibaba.otter.canal.common.utils.ExecutorTemplate; +import com.alibaba.otter.canal.common.utils.PropertiesUtils; import com.alibaba.otter.canal.connector.core.producer.AbstractMQProducer; import com.alibaba.otter.canal.connector.core.producer.MQDestination; import com.alibaba.otter.canal.connector.core.producer.MQMessageUtils; @@ -249,13 +249,15 @@ private List<Future> send(MQDestination mqDestination, String topicName, Message FlatMessage flatMessagePart = partitionFlatMessage[i]; if (flatMessagePart != null) { records.add(new ProducerRecord<>(topicName, i, null, JSON.toJSONBytes(flatMessagePart, - JSONWriter.Feature.WriteNulls))); + JSONWriter.Feature.WriteNulls, + JSONWriter.Feature.LargeObject))); } } } else { final int partition = mqDestination.getPartition() != null ? mqDestination.getPartition() : 0; records.add(new ProducerRecord<>(topicName, partition, null, JSON.toJSONBytes(flatMessage, - JSONWriter.Feature.WriteNulls))); + JSONWriter.Feature.WriteNulls, + JSONWriter.Feature.LargeObject))); } } } diff --git a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java index 1b48893546..b13003dfd8 100644 --- a/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java +++ b/connector/pulsarmq-connector/src/main/java/com/alibaba/otter/canal/connector/pulsarmq/producer/CanalPulsarMQProducer.java @@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory; import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONWriter; import com.alibaba.fastjson2.JSONWriter.Feature; import com.alibaba.otter.canal.common.utils.ExecutorTemplate; import com.alibaba.otter.canal.common.utils.NamedThreadFactory; @@ -330,7 +331,7 @@ private void sendMessage(String topic, int partition, List<FlatMessage> flatMess try { MessageId msgResultId = producer.newMessage() .property(MSG_PROPERTY_PARTITION_NAME, String.valueOf(partition)) - .value(JSON.toJSONBytes(f, Feature.WriteNulls)) + .value(JSON.toJSONBytes(f, Feature.WriteNulls, JSONWriter.Feature.LargeObject)) .send() // ; diff --git a/connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java b/connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java index a75434d44f..6327fe343f 100644 --- a/connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java +++ b/connector/rabbitmq-connector/src/main/java/com/alibaba/otter/canal/connector/rabbitmq/producer/CanalRabbitMQProducer.java @@ -165,7 +165,8 @@ private void send(MQDestination canalDestination, String topicName, Message mess // 串行分区 List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, messageSub.getId()); for (FlatMessage flatMessage : flatMessages) { - byte[] message = JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls); + byte[] message = JSON + .toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls, JSONWriter.Feature.LargeObject); if (logger.isDebugEnabled()) { logger.debug("send message:{} to destination:{}", message, canalDestination.getCanalDestination()); } diff --git a/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java b/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java index 824304e996..b2024e0364 100644 --- a/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java +++ b/connector/rocketmq-connector/src/main/java/com/alibaba/otter/canal/connector/rocketmq/producer/CanalRocketMQProducer.java @@ -263,7 +263,7 @@ public void send(final MQDestination destination, String topicName, List<Message> messages = flatMessagePart.stream() .map(flatMessage -> new Message(topicName, ((RocketMQProducerConfig) this.mqProperties).getTag(), - JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls))) + JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls, JSONWriter.Feature.LargeObject))) .collect(Collectors.toList()); // 批量发送 sendMessage(messages, index); @@ -278,7 +278,7 @@ public void send(final MQDestination destination, String topicName, List<Message> messages = flatMessages.stream() .map(flatMessage -> new Message(topicName, ((RocketMQProducerConfig) this.mqProperties).getTag(), - JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls))) + JSON.toJSONBytes(flatMessage, JSONWriter.Feature.WriteNulls,JSONWriter.Feature.LargeObject))) .collect(Collectors.toList()); // 批量发送 sendMessage(messages, partition);
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.
浙ICP备2022010568号-2