Avatar

Copy data between databases: a Golang kata

← Back to list
Posted on 22.12.2022
Image by Joshua Sortino on Unsplash
Refill!

This time I had a task of transferring some data between two Postgres instances, and I decided to go with a script.

Having a script for that kind of tasks is normally considered to be an overkill. If one wishes to copy data from one table to another, they just use the select-insert SQL syntax. Alternatively, the pg_dump and psql can be used to copy entire tables.

In my cases I needed to copy data between two database instances, potentially with some intricate logic implemented, so the thing was more similar to a migration script. Here is what I ended up with, give or take some styling and omitting all business-related stuff.

Copier class

The core class is presented in the listing below. The code is quite self-explanatory.

👉 📃  internal/copier/copier.go
package copier
import (
"context"
"database/sql"
"fmt"
"github.com/gocraft/dbr/v2"
"github.com/gocraft/dbr/v2/dialect"
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/luna-duclos/instrumentedsql"
)
const (
// TableName holds the name of the table which contains elements we copy
TableName = "elements"
)
type Options struct {
SrcDBPort int32
SrcDBUser string
SrcDBPassword string
DstDBPort int32
DstDBUser string
DstDBPassword string
}
// Element describes a structure of an element we copy.
// The field types could be as well just interface{}, because I simply copy-paste it, not manipulating.
type Element struct {
ID string `db:"id"`
Title string `db:"title"`
}
type Copier struct {
options *Options
}
func New(options *Options) *Copier {
return &Copier{
options: options,
}
}
// CopyElements copies elements under given IDs from the source database to a destination database
func (c *Copier) CopyElements(ids []string) error {
srcDB, dstDB, err := c.makeConnections()
if err != nil {
return err
}
defer srcDB.Close()
defer dstDB.Close()
srcDBSession := srcDB.NewSession(nil)
dstDBSession := dstDB.NewSession(nil)
// for the destination table session we create a transaction,
// so in case if there is a problem, the entire operation gets rolled back
dstDBTransaction, err := dstDBSession.Begin()
if err != nil {
return err
}
defer dstDBTransaction.RollbackUnlessCommitted()
ctx := context.Background()
processed := 0
for _, id := range ids {
// for every ID we get an element from the source database
// absence of en element is not an error
element, err := c.getElementByID(ctx, srcDBSession, id)
if err != nil {
fmt.Printf("Could not read element with id %s\n", id)
return err
}
if element.ID == "" {
fmt.Printf("Could not read element with id %s: element not found\n", id)
continue
}
fmt.Printf("Copying element %s\n", id)
err = c.saveElement(ctx, dstDBTransaction, element)
if err != nil {
fmt.Printf("Could not create an element with id %s\n", element.ID)
return err
}
processed += 1
}
err = dstDBTransaction.Commit()
if err != nil {
return err
}
fmt.Printf("Done. Items processed: %d\n", processed)
return nil
}
// getElementByID returns an element from the source database, by it's ID
func (c *Copier) getElementByID(ctx context.Context, session *dbr.Session, id string) (element *Element, err error) {
_, err = session.
Select("*").
From(TableName).
Where("id = ?", id).
LoadContext(ctx, &element)
if err != nil {
return nil, err
}
return element, nil
}
// saveElement saves a given element to the destination database
func (c *Copier) saveElement(ctx context.Context, session *dbr.Tx, element *Element) error {
columns := []string{"id", "title"}
// to avoid primary key collisions, the ID column must be different
element.ID = uuid.New().String()
err := session.InsertInto(TableName).Columns(columns...).Record(element).LoadContext(ctx, &element.ID)
if err != nil {
return err
}
return nil
}
// makeConnections creates two connections: for a database to read from, and for a database to write to
func (c *Copier) makeConnections() (srcDBConn *dbr.Connection, dstDBConn *dbr.Connection, err error) {
sql.Register("instrumented-postgres", instrumentedsql.WrapDriver(&pq.Driver{}))
// making a connection to an instance we read from
srcDB, err := c.makeConnection(c.makeDBInfo(c.options.SrcDBPort, c.options.SrcDBUser, c.options.SrcDBPassword))
if err != nil {
return nil, nil, err
}
// making a connection to an instance we write to
dstDB, err := c.makeConnection(c.makeDBInfo(c.options.DstDBPort, c.options.DstDBUser, c.options.DstDBPassword))
if err != nil {
srcDB.Close()
return nil, nil, err
}
return srcDB, dstDB, nil
}
// makeDBInfo returns a connection string for a chosen instance
func (c *Copier) makeDBInfo(port int32, user string, password string) string {
return fmt.Sprintf(
"host=localhost port=%d user=%s password=%s dbname=test sslmode=disable",
port,
user,
password,
)
}
// makeConnection makes a connection using a provided connection string
func (c *Copier) makeConnection(dbInfo string) (*dbr.Connection, error) {
// make a connection, using a specific driver
dbConnection, err := sql.Open("instrumented-postgres", dbInfo)
if err != nil {
return nil, err
}
// check if the connection is really there
err = dbConnection.Ping()
if err != nil {
return nil, err
}
// create a dbr wrapping connection
return &dbr.Connection{
DB: dbConnection,
Dialect: dialect.PostgreSQL,
EventReceiver: &dbr.NullEventReceiver{},
}, nil
}

Helpers

There was also a bunch of helpers to prompt a user for passwords and read CSV data.

👉 📃  internal/util/util.go
package util
import (
"encoding/csv"
"fmt"
"os"
"golang.org/x/term"
)
func PromptUser(prompt string) string {
fmt.Print(prompt + ": ")
input, _ := term.ReadPassword(0)
fmt.Print("\n")
return string(input)
}
func ReadCSV(filePath string) ([]string, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()
csvReader := csv.NewReader(file)
rows, err := csvReader.ReadAll()
if err != nil {
return nil, err
}
var records []string
for _, row := range rows {
records = append(records, row[0])
}
return records, nil
}

Putting everything together

To make everything neat and closer to an actual application, I wrapped the whole thing with a CLI command manager.

👉 📃  cmd/main.go
package main
import (
"fmt"
"os"
"strconv"
"copydata/internal/copier"
"copydata/internal/util"
"github.com/urfave/cli/v2"
)
func main() {
app := &cli.App{
Name: "copy-data",
Usage: "Just another Golang kata",
Commands: []*cli.Command{
{
Name: "copy",
Aliases: []string{"c"},
Usage: "Copy data from one db instance to another",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "file",
Usage: "File containing a list of IDs",
},
&cli.StringFlag{
Name: "src-port",
Value: "54321",
Usage: "Source instance port",
},
&cli.StringFlag{
Name: "src-user",
Value: "test",
Usage: "Source database user",
},
&cli.StringFlag{
Name: "dst-port",
Value: "54322",
Usage: "Destination instance port",
},
&cli.StringFlag{
Name: "dst-user",
Value: "test",
Usage: "Destination database user",
},
},
Action: func(cCtx *cli.Context) error {
filePath := cCtx.String("file")
srcPort := cCtx.String("src-port")
srcUser := cCtx.String("src-user")
srcPassword := util.PromptUser("Source database password")
dstPort := cCtx.String("dst-port")
dstUser := cCtx.String("dst-user")
dstPassword := util.PromptUser("Destination database password")
srcPortNumber, err := strconv.Atoi(srcPort)
if err != nil {
panic(err)
}
dstPortNumber, err := strconv.Atoi(dstPort)
if err != nil {
panic(err)
}
idList, err := util.ReadCSV(filePath)
copierInstance := copier.New(&copier.Options{
SrcDBPort: int32(srcPortNumber),
SrcDBUser: srcUser,
SrcDBPassword: srcPassword,
DstDBPort: int32(dstPortNumber),
DstDBUser: dstUser,
DstDBPassword: dstPassword,
})
err = copierInstance.CopyElements(idList)
if err != nil {
panic(err)
}
return nil
},
},
},
}
if err := app.Run(os.Args); err != nil {
panic(err)
}
}

Well, there is actually not much to add to this. A pretty straight-forward example of how to work with databases in Go and create a simple CLI application. The code is by tradition here. Hope the article was helpful to whoever read it :)

A bonus for GCP users

If you are using GCP, there is a lovely tool called cloud_sql_proxy by Google. It allows forwarding a connection to a CloudDB instance into a local TCP port. All you need is to have a service account armed and ready, and boom: saves a lot of time and hassle.

$
cloud_sql_proxy -instances=<PROJECT_NAME>:<REGION>:<CLOUDDB_INSTANCE_NAME>=tcp:<LOCAL_PORT>

Avatar

Sergei Gannochenko

Business-oriented fullstack engineer, in ❤️ with Tech.
React, Node, Go, Docker, AWS, Jamstack.
15+ years in dev.