Sample Header Ad - 728x90

PostgreSQL COPY network-bound in benchmark scenarios - no difference between binary and string format?

0 votes
0 answers
34 views
I am benchmarking the write pefrormance of PostgreSQL (+ TimescaleDB) at very high loads for my particular use case. I won't go into details about the setup, requirements, table columns etc, but the scale for our particular table/use case is about 20 billion rows/month. For insertion, I'm using the
command, as that seems to be the best way to achieve fast insertion into Postgres. I have a pretty good server and manage to achieve about 400k rows/sec insertion rate into the database, which does come close to fulfilling what we need, but... Upon further investigation, monitoring the resources of the server, nothing really seemed to be the bottleneck on the server itself, and that's when I found out the whole thing is network-bound. After investigation, some simple math brings the network overhead up to 3000 bytes per row (!). This is highly inefficient, but we are copying strings after all, so it's understandable. However, there is another way to do the
command:
FROM STDIN WITH BINARY
. This should be more efficient, but I find myself having basically the same overhead per row. This is something I do not understand. Is this expected, or am I doing something wrong on my side? Here are some excerpts from the python code I am using to benchmark binary copy: This is the part that does the insertion itself:
@retry
    def flush(self) -> Tuple[int, timedelta]:
        written_rows = 0
        time_taken = timedelta()
        if self.line_count > 0:
            self.buffer.write(struct.pack("!h", -1))
            self.buffer.seek(0)

            cur = self.conn.cursor()
            t_start = datetime.now()
            copy_command = f"copy {self.table_name} from stdin with binary"
            try:
                cur.copy_expert(copy_command, self.buffer)
            except Exception:
                logging.exception("Error occurred while writing the buffer")
            written_rows = cur.rowcount
            self.conn.commit()
            t_end = datetime.now()
            time_taken = t_end - t_start
            logging.info(f"Writing {written_rows} rows into {self.table_name} took {t_end - t_start}")

            self.buffer = BytesIO()
            self.buffer.write(struct.pack('!11sii', b'PGCOPY\n\377\r\n\0', 0, 0))
            self.line_count = 0
        return (written_rows, time_taken)
This is the part that does the formatting of what goes into the buffer:
class BinaryCopyTimescaleInsertionFormatter(BaseTimescaleInsertionFormatter):

    # Zip: (column position, format, size)
    metadata_row_format_part_1 = list(zip(('q', 'q', 'q', '?', '?'),
                                         ( 8,   8,   8,   1,   1 )))
    metadata_row_format_part_2 = list(zip(('i',),
                                          (4,)))
    event_row_format = list(zip(('q', 'q', 'd', 'q', 'i', 'i', 'i'),
                                (8,   8,   8,   8,   4,   4,   4)))

    def encode_string(self, string_to_encode):
        return struct.pack('!I', len(string_to_encode)) + string_to_encode.encode("utf-8")

    def sanitize_and_encode_string(self, string_to_encode):
        return self.encode_string(sanitize_string(string_to_encode))
    
    def encode_null(self):
        return struct.pack('!i', -1)
    
    def pack_col(self, col, row_format, val):
        return struct.pack('!i' + row_format[col], row_format[col], val)
    
    def format_metadata_line(self, metadata):
        
        binary_data = bytearray(struct.pack('!h', 11)) #no_columns
        binary_data.extend(self.pack_col(0, self.metadata_row_format_part_1, metadata.element_id))
        binary_data.extend(self.pack_col(1, self.metadata_row_format_part_1, 1))
        binary_data.extend(self.pack_col(2, self.metadata_row_format_part_1, metadata.type))
        binary_data.extend(self.pack_col(3, self.metadata_row_format_part_1, True))
        binary_data.extend(self.pack_col(4, self.metadata_row_format_part_1, False))
        binary_data.extend(self.sanitize_and_encode_string(metadata.element_name))
        binary_data.extend(self.sanitize_and_encode_string(metadata.element_name))
        binary_data.extend(self.pack_col(0, self.metadata_row_format_part_2, metadata.dpt_id))
        binary_data.extend(self.sanitize_and_encode_string(metadata.unit))
        binary_data.extend(self.sanitize_and_encode_string(metadata.alias))
        binary_data.extend(self.sanitize_and_encode_string(metadata.comment))
        
        return (bytes(binary_data))

    def _format_event_line(self, event):
        if DpeType(event.type) == DpeType.STRING or DpeType(event.type) == DpeType.DYN_STRING:
            el_num = None
            el_str = event.value
        else:
            el_num = event.value
            el_str = None

        binary_data = bytearray(struct.pack('!h', 8)) #no_columns
        binary_data.extend(self.pack_col(0, self.event_row_format, event.element_id))
        binary_data.extend(self.pack_col(1, self.event_row_format, int(event.ts // 1000)))
        binary_data.extend(self.pack_col(2, self.event_row_format, el_num) if el_num is not None else self.encode_null())
        binary_data.extend(self.pack_col(3, self.event_row_format, event.status))
        binary_data.extend(self.pack_col(4, self.event_row_format, event.manager))
        binary_data.extend(self.pack_col(5, self.event_row_format, event.user))
        binary_data.extend(self.encode_null())
        binary_data.extend(self.sanitize_and_encode_string(el_str) if el_str is not None else self.encode_null())
        
        return bytes(binary_data), None

    def _format_dyn_event_line(self, event):
        pass
Asked by user129186 (101 rep)
Aug 21, 2024, 08:07 AM