ClickHouse comes with useful tools for performing various tasks. clickhouse-copier is one among them and as the name suggests, it is used for copying data from one ClickHouse server to another. The servers can be from the same cluster or different cluster altogether. This tool requires Apache Zookeeper or clickhouse-keeper to synchronise the copying process across the servers. If the ClickHouse cluster has an existing Zookeeper cluster or ClickHouse keeper for replicated tables, then we can make use of the existing keeper. We can copy the data from one server to another using INSERT INTO … SELECT .. , but this is not an optimal choice for transferring huge tables. The clickhouse-copier can be used to transfer huge amounts of data without any bottlenecks and reliably across the servers.
The clickhouse-copier is a command line tool and comes with ClickHouse. It is recommended to run the copier tool from the server containing the data, but we can run the copier from any server. In this article, I am going to use a docker based setup to illustrated the working of this tool. The docker-compose has the following services
- Apache Zookeeper
- Two standalone ClickHouse server
version: '3' services: zookeeper: image: 'bitnami/zookeeper:latest' networks: - ch_copier environment: - ALLOW_ANONYMOUS_LOGIN=yes - ZOOKEEPER_CLIENT_PORT=2181 ports: - "2182:2181" - "2888:2888" - "3888:3888" clickhouse1: image: clickhouse/clickhouse-server ports: - "8002:9000" - "9123:8123" ulimits: nproc: 65535 nofile: soft: 262144 hard: 262144 networks: - ch_copier depends_on: - zookeeper clickhouse2: image: clickhouse/clickhouse-server ports: - "8003:9000" - "9124:8123" ulimits: nproc: 65535 nofile: soft: 262144 hard: 262144 networks: - ch_copier depends_on: - zookeeper networks: ch_copier: driver: bridge
Connect to any of the ClickHouse server running in docker. Create a table based on the following SQL statements.
CREATE TABLE salary ( `salary` Nullable(UInt32), `month` Nullable(String), `name` String ) ENGINE = MergeTree ORDER BY name SETTINGS index_granularity = 8192;
Once the table is created, insert some data based on generateRandom function in ClickHouse.
INSERT INTO salary SELECT * FROM generateRandom() LIMIT 10000000;
This will insert 10 million rows of data.
Once the containers are up and running, get into the container shell with ClickHouse server with data to be copied. Create the zookeeper.xml file in the home directory with the following contents.
<clickhouse> <zookeeper> <node index="1"> <host>zookeeper</host> <port>2181</port> </node> </zookeeper> </clickhouse>
Next, create the task.xml file. This file will contain all the details related to the copying task. It will include the details of the source and destination cluster, the tables and databases, the destination table engine etc. I have a sample to copy a salary table from one ClickHouse server to another. This xml file is based on the example available here.
<clickhouse> <!-- Configuration of clusters as in an ordinary server config --> <remote_servers> <source_cluster> <!-- source cluster & destination clusters accept exactly the same parameters as parameters for the usual Distributed table see https://clickhouse.com/docs/en/engines/table-engines/special/distributed/ --> <shard> <internal_replication>false</internal_replication> <replica> <host>clickhouse1</host> <port>9000</port> <!-- <user>default</user> <password>default</password> <secure>1</secure> --> </replica> </shard> </source_cluster> <destination_cluster> <shard> <internal_replication>false</internal_replication> <replica> <host>clickhouse2</host> <port>9000</port> <!-- <user>default</user> <password>default</password> <secure>1</secure> --> </replica> </shard> </destination_cluster> </remote_servers> <!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. --> <max_workers>2</max_workers> <!-- Setting used to fetch (pull) data from source cluster tables --> <settings_pull> <readonly>1</readonly> </settings_pull> <!-- Setting used to insert (push) data to destination cluster tables --> <settings_push> <readonly>0</readonly> </settings_push> <!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it. They are overlaid by <settings_pull/> and <settings_push/> respectively. --> <settings> <connect_timeout>3</connect_timeout> <!-- Sync insert is set forcibly, leave it here just in case. --> <insert_distributed_sync>1</insert_distributed_sync> </settings> <!-- Copying tasks description. You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed sequentially. --> <tables> <!-- A table task, copies one table. --> <table_salary> <!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied --> <cluster_pull>source_cluster</cluster_pull> <database_pull>default</database_pull> <table_pull>salary</table_pull> <!-- Destination cluster name and tables in which the data should be inserted --> <cluster_push>destination_cluster</cluster_push> <database_push>default</database_push> <table_push>salary_copied</table_push> <!-- Engine of destination tables. If destination tables have not be created, workers create them using columns definition from source tables and engine definition from here. NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will be dropped and refilled, take it into account if you already have some data in destination tables. You could directly specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of system.parts table. --> <engine> ENGINE=MergeTree() ORDER BY (name) </engine> <!-- Sharding key used to insert data to destination cluster --> <sharding_key>1</sharding_key> </table_salary> </tables> </clickhouse>
Given below are the important config sections
- <remote_server> – Contains the details of the source and destination remote servers. Also includes the shard and node information.
- <max_workers> – Maximum workers assigned to the copying task. There is no set formula to calculate the number of workers. This parameter roughly depends on the number of CPU cores and the size of the table.
- <settings> – Common settings like timeout etc
- <tables> – contains the source and destination table info.
- <cluster_pull> and <cluster_push> – Contains the cluster in <remote_server> used to push and pull the data
- <engine> – Table engine config for the destination table. If the table is not available, it will be created based on the columns in source table and the engine details present here
- <sharding_key> – Expression to generate sharding key. Here, we use only one shard-one replica and hence hard-coded this as 1
Run the copier tool from the directory containing the zookeeper.xml and task.xml
clickhouse-copier --config zookeeper.xml --task-path /copier/salary/1 --task-file task.xml
clickhouse-copierin daemon mode.
config— The path to the
zookeeper.xmlfile with the parameters for the connection to Apache Zookeeper.
task-path— The path to the Zookeeper node. This node is used for syncing
clickhouse-copierprocesses and storing tasks.
task-file— File with task configuration
task-upload-force— Force upload
task-fileeven if Zookeeper node exists.
base-dir— The path to logs and auxiliary files in the server where the copier is run (optional)
The copier may take a while to transfer the data. Once the copying is finished, you can connect to the ClickHouse server where the table is copied and verify