Moving Data Between Tables with CTE in Postgres

If you regularly need to move large amounts of data between two tables, it's better to consider other options like partitioned tables. However if this is a one off thing, or you can't change table structure, or you need to change data structure while moving records between tables, or for any other reason you need to do it programatically, you may use something like this:


DO $$ 
DECLARE 
   batch_size int := 1000; 
   archived_rows int := batch_size; 
   end_time timestamp(3); 
   current_time_utc timestamp(3); 
   continue_loop boolean := true; 
BEGIN 
SELECT now() at time zone 'utc' into current_time_utc; 
SELECT current_time_utc + INTERVAL '60 second' into end_time; 

WHILE(end_time > current_time_utc AND archived_rows = batch_size AND continue_loop = true) LOOP 
	IF EXISTS(SELECT 1 FROM tbl_source) THEN 
		WITH batch AS 
		( 
		 SELECT  ast.* 
		 FROM    tbl_source ast 
		 --WHERE select condition here
		 ORDER BY ast.id ASC LIMIT(batch_size) 
		) 
		, deleted_batch AS 
		( 
		 DELETE FROM tbl_source dast USING batch b WHERE dast.id = b.id 
		 RETURNING dast.* 
		) 
		, inserted_batch AS 
		( 
		 INSERT INTO tbl_target 
		 (SELECT * FROM deleted_batch)  
		 RETURNING 1 
		) 
		SELECT COUNT(1) INTO archived_rows FROM inserted_batch; 

		-- one second delay for breathing room 
		PERFORM pg_sleep(1); 
		SELECT now() at time zone 'utc' into current_time_utc; 
	ELSE 
		continue_loop := false; 
	END IF; 
END LOOP; 
END $$ 

Above code assumes tbl_source and tbl_target have exactly same column structure and column id is ever increasing primary key for tbl_source. If tbl_source has some other primary key or select conditions, batch and deleted_batch CTE statements have to be modified accordingly.

To prevent locking tables during operation, instead of moving all records in the table in one pass, records has to be processed in batches. This code runs in loop for 60 seconds or until no more records exist in tbl_source. Each time, selects first 1000 rows from tbl_source and CTE statement ensures that, DELETE and INSERT for this 1000 records is executed atomically. Optimal batch size is different in every case and you should experiment to tune it for your needs.

If tbl_source and tbl_target have different column structures, INSERT statement has to be changed accordingly. Also it is possible to MERGE records of tbl_source and tbl_target by using a INSERT ... ON CONFLICT DO UPDATE statement. For ON CONFLICT statement to work, column or columns data is being merged on has to be unique index on both tables. Following code demonstrates this:


DO $$ 
DECLARE 
   batch_size int := 1000; 
   archived_rows int := batch_size; 
   end_time timestamp(3); 
   current_time_utc timestamp(3); 
   continue_loop boolean := true; 
BEGIN 
SELECT now() at time zone 'utc' into current_time_utc; 
SELECT current_time_utc + INTERVAL '60 second' into end_time; 

WHILE(end_time > current_time_utc AND archived_rows = batch_size AND continue_loop = true) LOOP 
	IF EXISTS(SELECT 1 FROM tbl_source) THEN 
		WITH batch AS 
		( 
		 SELECT  ast.* 
		 FROM    tbl_source ast 
		 --WHERE select condition here
		 ORDER BY ast.id ASC LIMIT(batch_size) 
		) 
		, deleted_batch AS 
		( 
		 DELETE FROM tbl_source dast USING batch b WHERE dast.id = b.id 
		 RETURNING dast.* 
		) 
		, upserted_batch AS 
		( 
		 INSERT INTO tbl_target AS t (id, col_a, col_b, col_c) 
		 (SELECT id, col1, col2, col3 FROM deleted_batch ) 
		 ON CONFLICT (id) 
		 DO 
			UPDATE SET 
		 col_a = EXCLUDED.col_a, 
		 col_b = t.col_b + EXCLUDED.col_b, 
		 col_c = t.col_c + 1
		 RETURNING 1 
		) 
		SELECT COUNT(1) INTO archived_rows FROM upserted_batch; 

		-- one second delay for breathing room 
		PERFORM pg_sleep(1); 
		SELECT now() at time zone 'utc' into current_time_utc; 
	ELSE 
		continue_loop := false; 
	END IF; 
END LOOP; 
END $$