Skip to content

Commit 9f896be

Browse files
committed
Update crc32c module for Kafka message.
1 parent d08f097 commit 9f896be

File tree

3 files changed

+141
-566
lines changed

3 files changed

+141
-566
lines changed

src/protocol/KafkaMessage.cc

Lines changed: 19 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1413,7 +1413,7 @@ int KafkaMessage::parse_record_batch(void **buf, size_t *size,
14131413
return -1;
14141414
}
14151415

1416-
if ((int)crc32c(0, (const void *)*buf, hdr.length - 9) != hdr.crc)
1416+
if ((int)crc32c(*buf, hdr.length - 9) != hdr.crc)
14171417
{
14181418
errno = EBADMSG;
14191419
return -1;
@@ -1553,7 +1553,7 @@ int KafkaMessage::parse_records(void **buf, size_t *size, bool check_crcs,
15531553
return 0;
15541554
}
15551555
else if (ret < 0)
1556-
break;
1556+
return ret;
15571557
}
15581558

15591559
*size -= msg_set_size;
@@ -1563,14 +1563,6 @@ int KafkaMessage::parse_records(void **buf, size_t *size, bool check_crcs,
15631563

15641564
KafkaMessage::KafkaMessage()
15651565
{
1566-
static struct Crc32cInitializer
1567-
{
1568-
Crc32cInitializer()
1569-
{
1570-
crc32c_global_init();
1571-
}
1572-
} initializer;
1573-
15741566
this->parser = new kafka_parser_t;
15751567
kafka_parser_init(this->parser);
15761568
this->stream = new EncodeStream;
@@ -2081,6 +2073,7 @@ int KafkaRequest::encode_produce(struct iovec vectors[], int max)
20812073
int topic_cnt = 0;
20822074
this->toppar_list.rewind();
20832075
KafkaToppar *toppar;
2076+
KafkaBlock *block;
20842077

20852078
while ((toppar = this->toppar_list.get_next()) != NULL)
20862079
{
@@ -2210,11 +2203,7 @@ int KafkaRequest::encode_produce(struct iovec vectors[], int max)
22102203
append_i32(record_header, batch_length);
22112204
append_i32(record_header, 0);
22122205
append_i8(record_header, 2); //magic
2213-
2214-
uint32_t crc_32 = 0;
2215-
size_t crc32_offset = record_header.size();
2216-
2217-
append_i32(record_header, crc_32);
2206+
append_i32(record_header, 0);
22182207
append_i16(record_header, this->config.get_compress_type());
22192208
append_i32(record_header, batch_cnt - 1);
22202209
append_i64(record_header, first_timestamp);
@@ -2224,27 +2213,26 @@ int KafkaRequest::encode_produce(struct iovec vectors[], int max)
22242213
append_i32(record_header, -1);
22252214
append_i32(record_header, batch_cnt);
22262215

2227-
KafkaBlock *header_block = new KafkaBlock;
2228-
2229-
if (!header_block->set_block((void *)record_header.c_str(),
2230-
record_header.size()))
2216+
block = new KafkaBlock;
2217+
if (!block->set_block((void *)record_header.c_str(), record_header.size()))
22312218
{
2232-
delete header_block;
2219+
delete block;
22332220
return -1;
22342221
}
22352222

2236-
char *crc_ptr = (char *)header_block->get_block() + crc32_offset;
2223+
size_t crc32_offset = 8 + 4 + 4 + 1;
2224+
char *crc_ptr = (char *)block->get_block() + crc32_offset;
2225+
uint32_t crc_32 = crc32c_start();
22372226

2238-
this->serialized.insert_list(header_block);
2227+
this->serialized.insert_list(block);
22392228

2240-
crc_32 = crc32c(crc_32, (const void *)(crc_ptr + 4),
2241-
header_block->get_len() - crc32_offset - 4);
2229+
crc_32 = crc32c_continue(crc_ptr + 4, block->get_len() - crc32_offset - 4, crc_32);
22422230

22432231
this->serialized.block_insert_rewind();
2244-
KafkaBlock *block;
22452232
while ((block = this->serialized.get_block_insert_next()) != NULL)
2246-
crc_32 = crc32c(crc_32, block->get_block(), block->get_len());
2233+
crc_32 = crc32c_continue(block->get_block(), block->get_len(), crc_32);
22472234

2235+
crc_32 = crc32c_finish(crc_32);
22482236
*(uint32_t *)crc_ptr = htonl(crc_32);
22492237
*(uint32_t *)recordset_size_ptr = htonl(batch_length + 4 + 8);
22502238
}
@@ -2290,23 +2278,19 @@ int KafkaRequest::encode_produce(struct iovec vectors[], int max)
22902278
wrap_header.size() - crc32_offset - 4);
22912279

22922280
this->serialized.block_insert_rewind();
2293-
KafkaBlock *block;
2294-
22952281
while ((block = this->serialized.get_block_insert_next()) != NULL)
22962282
crc_32 = crc32(crc_32, (Bytef *)block->get_block(), block->get_len());
22972283

22982284
*(uint32_t *)crc_ptr = htonl(crc_32);
22992285

2300-
KafkaBlock *wrap_block = new KafkaBlock;
2301-
2302-
if (!wrap_block->set_block((void *)wrap_header.c_str(),
2303-
wrap_header.size()))
2286+
block = new KafkaBlock;
2287+
if (!block->set_block((void *)wrap_header.c_str(), wrap_header.size()))
23042288
{
2305-
delete wrap_block;
2289+
delete block;
23062290
return -1;
23072291
}
23082292

2309-
this->serialized.insert_list(wrap_block);
2293+
this->serialized.insert_list(block);
23102294
*(uint32_t *)recordset_size_ptr = htonl(message_size + 8 + 4);
23112295
}
23122296
else
@@ -2323,8 +2307,7 @@ int KafkaRequest::encode_produce(struct iovec vectors[], int max)
23232307
vectors[0].iov_base = (void *)this->msgbuf.c_str();
23242308
vectors[0].iov_len = this->msgbuf.size();
23252309

2326-
KafkaBlock *block = this->serialized.get_block_first();
2327-
2310+
block = this->serialized.get_block_first();
23282311
while (block)
23292312
{
23302313
this->stream->append_nocopy((const char *)block->get_block(),

0 commit comments

Comments
 (0)