Bulk Inserting Data into Postgres with .NET Core

In order to insert and process lots of data into database regularly and programmatically, MsSql has SqlBulkCopy class. Thankfully PostgreSQL offers similar functionality with binary import. We will be using Microsoft.EntityFrameworkCore package for foundation. On top of it, Npgsql.EntityFrameworkCore.PostgreSQL package provides postgres specific classes.

While inserting a lot of data into database, it's good practice to bulk insert data into a staging table with as few indexes as possible, then move data to final destination table. In this scenario, staging table is a good candidate to be an UNLOGGED table that will be truncated or dropped after every operation. Processing and shaping data, close to how it will be stored in database, before inserting into staging table would be preferable than trying to do it between staging table and destination table.


CREATE UNLOGGED TABLE public.tbl_staging
( 
    id1 smallint NOT NULL, 
    id2 integer NOT NULL, 
    data1 integer NOT NULL, 
    data2 bigint NOT NULL, 
    CONSTRAINT tbl_staging_pkey PRIMARY KEY(id1, id2) 
) 
TABLESPACE pg_default; 

For this particular staging table, primary key is created to be used for INSERT ON CONFLICT DO UPDATE statement to merge data into final destination table. In some cases, like if data is being inserted without any merge, primary key may be unnecessary.


        try
        {
            using (var conn = new NpgsqlConnection(_connectionString))
            {
                await conn.OpenAsync(token);
                token.ThrowIfCancellationRequested();

                using (var tran = conn.BeginTransaction())
                {
                    try
                    {
                        using (var writer = conn.BeginBinaryImport("COPY public.tbl_staging (id1, id2, data1, data2) FROM STDIN (FORMAT BINARY)"))
                        {
                            foreach (var itemToBeInserted in items)
                            {
                                writer.StartRow();
                                writer.Write(itemToBeInserted.id1Value, NpgsqlDbType.Smallint);
                                writer.Write(itemToBeInserted.id2Value, NpgsqlDbType.Integer);
                                writer.Write(itemToBeInserted.data1Value, NpgsqlDbType.Integer);
                                writer.Write(itemToBeInserted.data2Value, NpgsqlDbType.Bigint);
                            }
                            await writer.CompleteAsync();
                            await writer.CloseAsync();
                        }
                        await tran.CommitAsync();
                    }
                    catch
                    {
                        await tran.RollbackAsync();
                        throw;
                    }
                }
                await conn.CloseAsync();
            }
        }
        catch (Exception ex)
        {
            //handle exception
        }

NpgsqlConnection is the DbConnection class specific to Postgres. Using a transaction ensures that all or none of the data is inserted. Calling BeginBinaryImport method of NpgsqlConnection instance with appropriate COPY command string begins the operation. While NpgsqlBinaryImporter class has async counterparts of StartRow and Write methods, doing lots of async calls and awaiting them in a tight loop does not seem to be the best idea.

Then all needs to be done is merging/moving this data to final destination table by running appropriate INSERT command. Following code assumes data will be merged into tbl_target having similar structure to tbl_staging.


        try
        {
            using (var conn = new NpgsqlConnection(_connectionString))
            {
                await conn.OpenAsync(token);
                token.ThrowIfCancellationRequested();

                using (var tran = conn.BeginTransaction())
                {
                    try
                    {
                        //merge data
                        var commandText =
	                        "INSERT INTO public.tbl_target AS t (id1, id2, dataA, dataB) " +
	                        "(SELECT id1, id2, data1, data2 FROM public.tbl_staging ) " +
	                        "ON CONFLICT (id1, id2) " +
	                        "DO UPDATE SET " +
	                        "dataA = EXCLUDED.dataA; ";

                        var command = conn.CreateCommand();
                        command.Transaction = tran;
                        command.CommandType = CommandType.Text;
                        command.CommandText = commandText;
                        await command.ExecuteNonQueryAsync(token);
                        token.ThrowIfCancellationRequested();

                        await tran.CommitAsync();
                    }
                    catch
                    {
                        await tran.RollbackAsync();
                        throw;
                    }
                }
                await conn.CloseAsync();
            }
        }
        catch (Exception ex)
        {
            //handle exception
        }