#!/usr/bin/perl
#------------------------------------------------------------------------------
#
# pg_dumpbinary - Dump a PostgreSQL database using binary format for data.
#
# This program is open source, licensed under the PostgreSQL license.
# For license terms, see the LICENSE file.
#
# Author: Gilles Darold
# Copyright: (C) 2019 Gilles Darold - All rights reserved.
#------------------------------------------------------------------------------
use strict;

use Getopt::Long  qw(:config bundling no_ignore_case_always);
use POSIX qw(locale_h sys_wait_h _exit);
use Time::HiRes qw/usleep/;
use File::Spec qw/ tmpdir /;
use File::Temp qw/ tempfile /;
use DBI;
use DBD::Pg;

my $VERSION = '1.0';
my $PROGRAM = 'pg_dumpbinary';

my $DBNAME = '';
my $DBUSER = '';
my $DBHOST = '';
my $DBPORT = 5432;
my @INC_SCHEMA = ();
my @EXC_SCHEMA = ();
my @INC_TABLE = ();
my @EXC_TABLE = ();

my $TMP_DIR   = File::Spec->tmpdir() || '/tmp';
my $PGDUMP    = 'pg_dump';
my $PGRESTORE = 'pg_restore';
my $PSQL      = 'psql';
my $PG_OPT    = '';
my $PG_FILTER = '';
my $JOBS      = 1;
my $VER       = 0;
my $HELP      = 0;

my $interrupt    = 0;
my $child_count  = 0;
my %RUNNING_PIDS = ();
my $dbh          = undef;

my @post_conn = (
	"SELECT pg_catalog.set_config('search_path', '', false);",
	"SET client_encoding TO 'UTF8';",
	"SET DATESTYLE = ISO;",
	"SET INTERVALSTYLE = POSTGRES;",
	"SET extra_float_digits TO 3;",
	"SET synchronize_seqscans TO off;",
	"SET statement_timeout = 0;",
	"SET lock_timeout = 0;",
	"SET idle_in_transaction_session_timeout = 0;",
	"SET row_security = off;",
	"BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY;"
);

####
# Method used to fork as many child as wanted,
# must be declared at top if the program.
####
sub spawn
{
	my $coderef = shift;

	unless (@_ == 0 && $coderef && ref($coderef) eq 'CODE')
	{
		print "usage: spawn CODEREF";
		exit 0;
	}

	my $pid;
	if (!defined($pid = fork)) {
		print STDERR "Error: cannot fork: $!\n";
		return;
	} elsif ($pid) {
		$RUNNING_PIDS{$pid} = $pid;
		return; # the parent
	}
	# the child -- go spawn
	$< = $>;
	$( = $); # suid progs only

	exit &$coderef();
}


$SIG{'CHLD'} = 'DEFAULT';

# With multiprocess we need to wait for all children
sub wait_child
{
	my $sig = shift;

	$interrupt = 1;

	print STDERR "Received terminating signal ($sig)\n";
	if ($^O !~ /MSWin32|dos/i) {
		1 while wait != -1;
		$SIG{INT} = \&wait_child;
		$SIG{TERM} = \&wait_child;
	}
	$dbh->disconnect() if (defined $dbh);

	_exit(0);
}
$SIG{INT} = \&wait_child;
$SIG{TERM} = \&wait_child;

$| = 1;

GetOptions(
	"d|database=s"       => \$DBNAME,
	"h|host=s"           => \$DBHOST,
	"j|jobs=s"           => \$JOBS,
	"n|schema=s"         => \@INC_SCHEMA,
	"N|exclude-schema=s" => \@EXC_SCHEMA,
	"p|port=i"           => \$DBPORT,
	"t|table=s"          => \@INC_TABLE,
	"T|exclude-table=s"  => \@EXC_TABLE,
	"u|user=s"           => \$DBUSER,
	"v|version!"         => \$VER,
	"help!"              => \$HELP,
);

if ($VER)
{
	print "$PROGRAM Version: v$VERSION\n";
	exit 0;
}

&usage if ($HELP);

if (!$DBNAME)
{
	&usage("ERROR: you must specify a database to dump, see -d option\n");
}

# Set pg_dump/psql option
if ($DBHOST)
{
	$PG_OPT .= " -h $DBHOST";
}
if ($DBPORT)
{
	$PG_OPT .= " -p $DBPORT";
}
if ($DBUSER)
{
	$PG_OPT .= " -U $DBUSER";
}

# Set schema/table filter options
&set_filter_option();

# Set default output directory name
my $CURDATE = get_current_date();
my $OUTDIR = $ARGV[0] || 'binary_bkup_' . $CURDATE;

# Create output directory
if (!-d $OUTDIR)
{
	mkdir $OUTDIR or die "FATAL: can not create directory $OUTDIR, $!\n";
}

print "Database $DBNAME dump created at ", `date --rfc-3339=seconds`;

# Set DBD::Pg options and connect to the database
# to create the snapshot for a consistent backup
my $dbpg_opt = '';
$dbpg_opt .= ";port=$DBPORT" if ($DBPORT);
$dbpg_opt .= ";host=$DBHOST" if ($DBHOST);
$dbh = DBI->connect("dbi:Pg:dbname=$DBNAME$dbpg_opt", $DBUSER, '', {AutoCommit => 1, InactiveDestroy => 1});
if (not defined $dbh)
{
	die "FATAL: can not connect to database $DBNAME\n";
}

# Verify that we have a PG >= 9.2
my $sth = $dbh->prepare("SELECT version()") or die "FATAL: " . $dbh->errstr . "\n";
$sth->execute or die "FATAL: " . $dbh->errstr . "\n";
my $pgversion = 0;
while (my $row = $sth->fetch)
{
	if ($row->[0] =~ /^[^\s]+ (\d+\.\d+)/)
	{
		$pgversion = $1;
		$pgversion =~ s/\..*// if ($pgversion >= 10);
	}
}
$sth->finish;

# No way before PG 9.2
if ($pgversion < 9.2)
{
	$dbh->disconnect();
	die "FATAL: pg_dumpbinary require PG >= 9.2!\n";
}

# Be sure that the host is not in recovery mode or that we have PG >= 10
$sth = $dbh->prepare("SELECT pg_catalog.pg_is_in_recovery()") or die "FATAL: " . $dbh->errstr . "\n";
$sth->execute or die "FATAL: " . $dbh->errstr . "\n";
my $is_in_recovery = 0;
while (my $row = $sth->fetch)
{
	$is_in_recovery = $row->[0];
}
$sth->finish;
if ($is_in_recovery && $pgversion < 10)
{
	$dbh->disconnect();
	die "FATAL: PostgreSQL server is in recovery mode, this is not supported before PG 10.\n";
}

# Create a snapshot that will be reused with pg_dump for DDL and psql to retrieve data
foreach my $q (@post_conn)
{
	next if ($pgversion < 9.6 and $q =~ /SET idle_in_transaction_session_timeout/);
	$dbh->do("$q") or die "FATAL: " . $dbh->errstr . "\n";
}
$sth = $dbh->prepare("SELECT pg_catalog.pg_export_snapshot()") or die "FATAL: " . $dbh->errstr . "\n";
$sth->execute or die "FATAL: " . $dbh->errstr . "\n";
my $snapshot = ''; 
while (my $row = $sth->fetch)
{
	$snapshot = $row->[0];
}
$sth->finish;
if (!$snapshot)
{
	$dbh->disconnect();
	die "FATAL: can not obtain a snapshot, please check what's going wrong.\n";
}

# Dump database pre-data section
print "Dumping pre data section\n";
`$PGDUMP --snapshot='$snapshot' $PG_OPT $PG_FILTER -d $DBNAME -Fc --section=pre-data -f $OUTDIR/pre-data.dmp`;
if ($?)
{
	$dbh->disconnect();
	die "ERROR: pg_dump error to dump pre-data section.\n";
}

# Retrieve list of schema.tables to restore.
my %tbl_list = get_table_list();

# Distribute all tables equally between all processes.
my %distributed_table = ();
my $proc = 1;
foreach my $s (sort keys %tbl_list)
{
	for (my $i = 0; $i <= $#{$tbl_list{$s}}; $i++)
	{
		my $file = quotemeta($s) . '.' . quotemeta($tbl_list{$s}[$i]);
		push(@{ $distributed_table{$proc} }, "\\o");
		push(@{ $distributed_table{$proc} }, "\\echo Dumping data from table $s.$tbl_list{$s}[$i]");
		push(@{ $distributed_table{$proc} }, qq{\\o |gzip -c - > "$OUTDIR/data-$file.bin.gz"});
		push(@{ $distributed_table{$proc} }, qq{COPY "$s"."$tbl_list{$s}[$i]" TO stdout WITH (FORMAT binary);});
		$proc++;
		$proc = 1 if ($proc > $JOBS);
	}
}

# Fork a process to call psql to execute COPY in gziped binary format
# with the per process dedicated tables to export.
foreach my $p (sort keys %distributed_table)
{
	spawn sub
	{
		my ($fh, $filename) = tempfile('pg_dumpbinXXXX', SUFFIX => '.tmp', DIR => $TMP_DIR, UNLINK => 1 );
		if (defined $fh)
		{
			print $fh "\\o /dev/null\n";
			foreach my $q (@post_conn)
			{
				next if ($pgversion < 9.6 and $q =~ /SET idle_in_transaction_session_timeout/);
				print $fh "$q\n";
			}
			print $fh "SET TRANSACTION SNAPSHOT '$snapshot';\n";
			print $fh "\\o\n";
			map { print $fh "$_\n"; } @{ $distributed_table{$p} };
			close($fh);
			`$PSQL $PG_OPT -d $DBNAME -f $filename`;

			# Remove empty gzipped data file
			foreach my $t (@{ $distributed_table{$p} } )
			{
				# extract filename part from COPY command
				if ($t =~ /gzip -c - > "([^\"]+)"$/)
				{
					if ((stat("$1"))[7] == 35)
					{
						unlink($1);
					}
				}
			}
		}
		unlink($filename);
	};
}

# Wait for all child processes to localdie
while (scalar keys %RUNNING_PIDS > 0)
{
        my $kid = waitpid(-1, WNOHANG);
        if ($kid > 0)
	{
                delete $RUNNING_PIDS{$kid};
        }
	usleep(50000);
}

# Dump database post-data section
if (!$interrupt)
{
	print "Dumping post data section\n";
	`$PGDUMP --snapshot="$snapshot" $PG_OPT $PG_FILTER -d $DBNAME -Fc --section=post-data -f $OUTDIR/post-data.dmp`;
	if ($?)
	{
		$dbh->disconnect();
		die "ERROR: pg_dump error for post-data section.\n";
	}
}

# Disconnect from snapshot connection
$dbh->disconnect;

exit(1) if ($interrupt);

print "Dump ended at ", `date --rfc-3339=seconds`;

exit 0;

#----------------------------------------------------------------------------------

####
# Show program usage
####
sub usage
{
	my $msg = shift();

	print qq{
Program used to dump a PostgreSQL database with data dumped in binary
format. The resulting dumps can be restored using pg_restorebinary.

usage: pg_dumpbinary -d dbname [options] backup_name

    backup_name   output directory where dump will be saved. Default
                  directory name is binary_bkup_YYYY-MM-DDTHH:MM:SS
		  when no output directory is provided.
options:

  -d, --database DBNAME        database to dump
  -h, --host HOSTNAME          database server host or socket directory
  -j, --job NUM                use this many parallel jobs to dump
  -n, --schema SCHEMA          dump the named schema(s) only
  -N, --exclude-schema SCHEMA  do NOT dump the named schema(s)
  -p, --port PORT              database server port number, default: 5432
  -t, --table TABLE            dump named relation
  -T, --exclude-table TABLE    do NOT dump the named table
  -u, --user NAME              connect as specified database user
  -v, --version                show program version
  --help                       show usage

$msg
};
	exit 0;
}

####
# Return a hash of array corresponding to the
# list of tables per schema of the database.
####
sub get_table_list
{
	my %tb_lst = ();

	my @list =  `$PGRESTORE -l $OUTDIR/pre-data.dmp | grep " TABLE "`;
	chomp(@list);

	foreach my $l (@list)
	{
		# 198; 1259 57159 TABLE public T1 gilles
		next if ($l =~ /\d+: \d+ \d+ TABLE /);
		my @inf = split(/\s/, $l);
		next if ($#inf != 6);
		push( @{ $tb_lst{$inf[4]} }, $inf[5]);
	}

	return %tb_lst;
}

####
# Return the date in ISO 8601 format minus the timezone part
####
sub get_current_date
{
	my $curdate = `date -Iseconds`;
	chomp($curdate);
	$curdate =~ s/[+-]\d{2}:\d{2}$//;

	return $curdate;
}

####
# Set schema/table option to use with pg_dump
####
sub set_filter_option
{

	if ($#INC_SCHEMA >= 0)
	{
		foreach my $s (@INC_SCHEMA)
		{
			$PG_FILTER .= " -n '$s'";
		}
	}

	if ($#EXC_SCHEMA >= 0)
	{
		foreach my $s (@EXC_SCHEMA)
		{
			$PG_FILTER .= " -N '$s'";
		}
	}

	if ($#INC_TABLE >= 0)
	{
		foreach my $s (@INC_TABLE)
		{
			$PG_FILTER .= " -t '$s'";
		}
	}

	if ($#EXC_TABLE >= 0)
	{
		foreach my $s (@EXC_TABLE)
		{
			$PG_FILTER .= " -T '$s'";
		}
	}
}

