[pve-devel] [PATCH v2 pve-manager 01/22] pvesr: add pve storage replication tool

Dietmar Maurer dietmar at proxmox.com
Mon May 29 11:29:45 CEST 2017


Just added code to configure jobs. Replication itself is not
implemented.

Signed-off-by: Dietmar Maurer <dietmar at proxmox.com>
---
 PVE/API2/Cluster.pm           |   7 ++
 PVE/API2/Makefile             |   2 +
 PVE/API2/Nodes.pm             |   7 ++
 PVE/API2/Replication.pm       |  93 ++++++++++++++++
 PVE/API2/ReplicationConfig.pm | 217 +++++++++++++++++++++++++++++++++++++
 PVE/CLI/Makefile              |   2 +-
 PVE/CLI/pvesr.pm              | 150 ++++++++++++++++++++++++++
 PVE/Makefile                  |   1 +
 PVE/Replication.pm            | 242 ++++++++++++++++++++++++++++++++++++++++++
 bin/Makefile                  |   2 +-
 bin/pvesr                     |   8 ++
 11 files changed, 729 insertions(+), 2 deletions(-)
 create mode 100644 PVE/API2/Replication.pm
 create mode 100644 PVE/API2/ReplicationConfig.pm
 create mode 100644 PVE/CLI/pvesr.pm
 create mode 100644 PVE/Replication.pm
 create mode 100644 bin/pvesr

diff --git a/PVE/API2/Cluster.pm b/PVE/API2/Cluster.pm
index 6ce86de4..0e94e9ff 100644
--- a/PVE/API2/Cluster.pm
+++ b/PVE/API2/Cluster.pm
@@ -22,10 +22,16 @@ use PVE::RPCEnvironment;
 use PVE::JSONSchema qw(get_standard_option);
 use PVE::Firewall;
 use PVE::API2::Firewall::Cluster;
+use PVE::API2::ReplicationConfig;
 
 use base qw(PVE::RESTHandler);
 
 __PACKAGE__->register_method ({
+    subclass => "PVE::API2::ReplicationConfig",
+    path => 'replication',
+});
+
+__PACKAGE__->register_method ({
     subclass => "PVE::API2::ClusterConfig",
     path => 'config',
 });
@@ -82,6 +88,7 @@ __PACKAGE__->register_method ({
 	    { name => 'log' },
 	    { name => 'options' },
 	    { name => 'resources' },
+	    { name => 'replication' },
 	    { name => 'tasks' },
 	    { name => 'backup' },
 	    { name => 'ha' },
diff --git a/PVE/API2/Makefile b/PVE/API2/Makefile
index 4dfcbb56..86d75d36 100644
--- a/PVE/API2/Makefile
+++ b/PVE/API2/Makefile
@@ -1,6 +1,8 @@
 include ../../defines.mk
 
 PERLSOURCE = 			\
+	Replication.pm		\
+	ReplicationConfig.pm	\
 	Ceph.pm			\
 	APT.pm			\
 	Subscription.pm		\
diff --git a/PVE/API2/Nodes.pm b/PVE/API2/Nodes.pm
index a45ca6db..885cf116 100644
--- a/PVE/API2/Nodes.pm
+++ b/PVE/API2/Nodes.pm
@@ -40,6 +40,7 @@ use PVE::API2::VZDump;
 use PVE::API2::APT;
 use PVE::API2::Ceph;
 use PVE::API2::Firewall::Host;
+use PVE::API2::Replication;
 use Digest::MD5;
 use Digest::SHA;
 use PVE::API2::Disks;
@@ -113,6 +114,11 @@ __PACKAGE__->register_method ({
 });
 
 __PACKAGE__->register_method ({
+    subclass => "PVE::API2::Replication",
+    path => 'replication',
+});
+
+__PACKAGE__->register_method ({
     name => 'index', 
     path => '', 
     method => 'GET',
@@ -147,6 +153,7 @@ __PACKAGE__->register_method ({
 	    { name => 'tasks' },
 	    { name => 'rrd' }, # fixme: remove?
 	    { name => 'rrddata' },# fixme: remove?
+	    { name => 'replication' },
 	    { name => 'vncshell' },
 	    { name => 'spiceshell' },
 	    { name => 'time' },
diff --git a/PVE/API2/Replication.pm b/PVE/API2/Replication.pm
new file mode 100644
index 00000000..eaa7e571
--- /dev/null
+++ b/PVE/API2/Replication.pm
@@ -0,0 +1,93 @@
+package PVE::API2::Replication;
+
+use warnings;
+use strict;
+
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::RPCEnvironment;
+use PVE::ReplicationConfig;
+use PVE::Replication;
+
+use PVE::RESTHandler;
+
+use base qw(PVE::RESTHandler);
+
+__PACKAGE__->register_method ({
+    name => 'index',
+    path => '',
+    method => 'GET',
+    permissions => { user => 'all' },
+    description => "Directory index.",
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    node => get_standard_option('pve-node'),
+	},
+    },
+    returns => {
+	type => 'array',
+	items => {
+	    type => "object",
+	    properties => {},
+	},
+	links => [ { rel => 'child', href => "{name}" } ],
+    },
+    code => sub {
+	my ($param) = @_;
+
+	return [
+	    { name => 'status' },
+	];
+    }});
+
+
+__PACKAGE__->register_method ({
+    name => 'status',
+    path => 'status',
+    method => 'GET',
+    description => "List replication job status.",
+    permissions => {
+	description => "Requires the VM.Audit permission on /vms/<vmid>.",
+	user => 'all',
+    },
+    protected => 1,
+    proxyto => 'node',
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    node => get_standard_option('pve-node'),
+	},
+    },
+    returns => {
+	type => 'array',
+	items => {
+	    type => "object",
+	    properties => {},
+	},
+	links => [ { rel => 'child', href => "{vmid}" } ],
+    },
+    code => sub {
+	my ($param) = @_;
+
+	my $rpcenv = PVE::RPCEnvironment::get();
+	my $authuser = $rpcenv->get_user();
+
+	my $jobs = PVE::Replication::job_status();
+
+	my $res = [];
+	foreach my $id (sort keys %$jobs) {
+	    my $d = $jobs->{$id};
+	    my $state = delete $d->{state};
+	    my $vmid = $d->{guest};
+	    next if !$rpcenv->check($authuser, "/vms/$vmid", [ 'VM.Audit' ]);
+	    $d->{id} = $id;
+	    foreach my $k (qw(last_sync fail_count error duration)) {
+		$d->{$k} = $state->{$k} if defined($state->{$k});
+	    }
+	    push @$res, $d;
+	}
+
+	return $res;
+    }});
+
+1;
diff --git a/PVE/API2/ReplicationConfig.pm b/PVE/API2/ReplicationConfig.pm
new file mode 100644
index 00000000..6f4de385
--- /dev/null
+++ b/PVE/API2/ReplicationConfig.pm
@@ -0,0 +1,217 @@
+package PVE::API2::ReplicationConfig;
+
+use warnings;
+use strict;
+
+use PVE::Tools qw(extract_param);
+use PVE::Exception qw(raise_perm_exc);
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::RPCEnvironment;
+use PVE::ReplicationConfig;
+
+use PVE::RESTHandler;
+
+use base qw(PVE::RESTHandler);
+
+__PACKAGE__->register_method ({
+    name => 'index',
+    path => '',
+    method => 'GET',
+    description => "List replication jobs.",
+    permissions => {
+	description => "Requires the VM.Audit permission on /vms/<vmid>.",
+	user => 'all',
+    },
+    parameters => {
+	additionalProperties => 0,
+	properties => {},
+    },
+    returns => {
+	type => 'array',
+	items => {
+	    type => "object",
+	    properties => {},
+	},
+	links => [ { rel => 'child', href => "{vmid}" } ],
+    },
+    code => sub {
+	my ($param) = @_;
+
+	my $rpcenv = PVE::RPCEnvironment::get();
+	my $authuser = $rpcenv->get_user();
+
+	my $cfg = PVE::ReplicationConfig->new();
+
+	my $res = [];
+	foreach my $id (sort keys %{$cfg->{ids}}) {
+	    my $d = $cfg->{ids}->{$id};
+	    my $vmid = $d->{guest};
+	    next if !$rpcenv->check($authuser, "/vms/$vmid", [ 'VM.Audit' ]);
+	    $d->{id} = $id;
+	    push @$res, $d;
+	}
+
+	return $res;
+    }});
+
+__PACKAGE__->register_method ({
+    name => 'read',
+    path => '{id}',
+    method => 'GET',
+    description => "Read replication job configuration.",
+    permissions => {
+	description => "Requires the VM.Audit permission on /vms/<vmid>.",
+	user => 'all',
+    },
+    parameters => {
+    	additionalProperties => 0,
+	properties => {
+	    id => get_standard_option('pve-replication-id'),
+	},
+    },
+    returns => { type => 'object' },
+    code => sub {
+	my ($param) = @_;
+
+	my $rpcenv = PVE::RPCEnvironment::get();
+	my $authuser = $rpcenv->get_user();
+
+	my $cfg = PVE::ReplicationConfig->new();
+
+	my $data = $cfg->{ids}->{$param->{id}};
+
+	die "no such replication job '$param->{id}'\n" if !defined($data);
+
+	my $vmid = $data->{guest};
+
+	raise_perm_exc() if !$rpcenv->check($authuser, "/vms/$vmid", [ 'VM.Audit' ]);
+
+	$data->{id} = $param->{id};
+
+	return $data;
+    }});
+
+__PACKAGE__->register_method ({
+    name => 'create',
+    path => '',
+    protected => 1,
+    method => 'POST',
+    description => "Create a new replication job",
+    permissions => {
+	check => ['perm', '/storage', ['Datastore.Allocate']],
+    },
+    parameters => PVE::ReplicationConfig->createSchema(),
+    returns => { type => 'null' },
+    code => sub {
+	my ($param) = @_;
+
+	my $type = extract_param($param, 'type');
+	my $plugin = PVE::ReplicationConfig->lookup($type);
+	my $id = extract_param($param, 'id');
+
+	my $code = sub {
+	    my $cfg = PVE::ReplicationConfig->new();
+
+	    #die "replication job for guest '$param->{guest}' to target '$param->{target}' already exists\n"
+	    die "replication job '$id' already exists\n"
+		if $cfg->{ids}->{$id};
+
+	    my $opts = $plugin->check_config($id, $param, 1, 1);
+
+	    $cfg->{ids}->{$id} = $opts;
+
+	    $cfg->write();
+	};
+
+	PVE::ReplicationConfig::lock($code);
+
+	return undef;
+    }});
+
+
+__PACKAGE__->register_method ({
+    name => 'update',
+    protected => 1,
+    path => '{id}',
+    method => 'PUT',
+    description => "Update replication job configuration.",
+    permissions => {
+	check => ['perm', '/storage', ['Datastore.Allocate']],
+    },
+    parameters => PVE::ReplicationConfig->updateSchema(),
+    returns => { type => 'null' },
+    code => sub {
+	my ($param) = @_;
+
+	my $id = extract_param($param, 'id');
+
+	my $code = sub {
+	    my $cfg = PVE::ReplicationConfig->new();
+
+	    my $data = $cfg->{ids}->{$id};
+	    die "no such job '$id'\n" if !$data;
+
+	    my $plugin = PVE::ReplicationConfig->lookup($data->{type});
+	    my $opts = $plugin->check_config($id, $param, 0, 1);
+
+	    foreach my $k (%$opts) {
+		$data->{$k} = $opts->{$k};
+	    }
+
+	    $cfg->write();
+	};
+
+	PVE::ReplicationConfig::lock($code);
+
+	return undef;
+    }});
+
+__PACKAGE__->register_method ({
+    name => 'delete',
+    protected => 1,
+    path => '{id}',
+    method => 'DELETE',
+    description => "Delete replication job",
+    permissions => {
+	check => ['perm', '/storage', ['Datastore.Allocate']],
+    },
+    parameters => {
+    	additionalProperties => 0,
+	properties => {
+	    id => get_standard_option('pve-replication-id'),
+	    keep => {
+		description => "Keep replicated data at target (do not remove).",
+		type => 'boolean',
+		optional => 1,
+		default => 0,
+	    },
+	}
+    },
+    returns => { type => 'null' },
+    code => sub {
+	my ($param) = @_;
+
+	my $id = extract_param($param, 'id');
+
+	my $code = sub {
+	    my $cfg = PVE::ReplicationConfig->new();
+
+	    my $data = $cfg->{ids}->{$id};
+	    die "no such job '$id'\n" if !$data;
+
+	    if (!$param->{keep}) {
+		# fixme: cleanup data at target
+
+	    }
+	    # fixme: cleanup snapshots
+
+	    delete $cfg->{ids}->{$id};
+
+	    $cfg->write();
+	};
+
+	PVE::ReplicationConfig::lock($code);
+
+	return undef;
+    }});
+1;
diff --git a/PVE/CLI/Makefile b/PVE/CLI/Makefile
index b005a8f1..3a27503d 100644
--- a/PVE/CLI/Makefile
+++ b/PVE/CLI/Makefile
@@ -1,6 +1,6 @@
 include ../../defines.mk
 
-SOURCES=vzdump.pm pvesubscription.pm pveceph.pm pveam.pm
+SOURCES=vzdump.pm pvesubscription.pm pveceph.pm pveam.pm pvesr.pm
 
 all:
 
diff --git a/PVE/CLI/pvesr.pm b/PVE/CLI/pvesr.pm
new file mode 100644
index 00000000..43359604
--- /dev/null
+++ b/PVE/CLI/pvesr.pm
@@ -0,0 +1,150 @@
+package PVE::CLI::pvesr;
+
+use strict;
+use warnings;
+use POSIX qw(strftime);
+use JSON;
+
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::INotify;
+use PVE::RPCEnvironment;
+use PVE::Tools qw(extract_param);
+use PVE::SafeSyslog;
+use PVE::CLIHandler;
+
+use PVE::Replication;
+use PVE::API2::ReplicationConfig;
+use PVE::API2::Replication;
+
+use base qw(PVE::CLIHandler);
+
+my $nodename = PVE::INotify::nodename();
+
+sub setup_environment {
+    PVE::RPCEnvironment->setup_default_cli_env();
+}
+
+__PACKAGE__->register_method ({
+    name => 'run',
+    path => 'run',
+    method => 'POST',
+    description => "This method is called by the systemd-timer and executes all (or a specific) sync jobs.",
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    id => get_standard_option('pve-replication-id', { optional => 1 }),
+	},
+    },
+    returns => { type => 'null' },
+    code => sub {
+	my ($param) = @_;
+
+	if (my $id = extract_param($param, 'id')) {
+
+	    PVE::Replication::run_single_job($id);
+
+	} else {
+
+	    PVE::Replication::run_jobs();
+	}
+
+	return undef;
+    }});
+
+__PACKAGE__->register_method ({
+    name => 'enable',
+    path => 'enable',
+    method => 'POST',
+    description => "Enable a replication job.",
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    id => get_standard_option('pve-replication-id'),
+	},
+    },
+    returns => { type => 'null' },
+    code => sub {
+	my ($param) = @_;
+
+	$param->{disable} = 0;
+
+	return PVE::API2::ReplicationConfig->update($param);
+    }});
+
+__PACKAGE__->register_method ({
+    name => 'disable',
+    path => 'disable',
+    method => 'POST',
+    description => "Disable a replication job.",
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    id => get_standard_option('pve-replication-id'),
+	},
+    },
+    returns => { type => 'null' },
+    code => sub {
+	my ($param) = @_;
+
+	$param->{disable} = 1;
+
+	return PVE::API2::ReplicationConfig->update($param);
+    }});
+
+my $print_job_list = sub {
+    my ($list) = @_;
+
+    my $format = "%-20s %10s %-20s %10s %5s %8s\n";
+
+    printf($format, "JobID", "GuestID", "Target", "Interval", "Rate", "Enabled");
+
+    foreach my $job (sort { $a->{guest} <=> $b->{guest} } @$list) {
+	my $plugin = PVE::ReplicationConfig->lookup($job->{type});
+	my $tid = $plugin->get_unique_target_id($job);
+
+	printf($format, $job->{id}, $job->{guest}, $tid,
+	       defined($job->{interval}) ? $job->{interval} : '-',
+	       defined($job->{rate}) ? $job->{rate} : '-',
+	       $job->{disable} ? 'no' : 'yes'
+	    );
+    }
+};
+
+my $print_job_status = sub {
+    my ($list) = @_;
+
+    my $format = "%-20s %10s %-20s %20s %10s %10s %s\n";
+
+    printf($format, "JobID", "GuestID", "Target", "LastSync", "Duration", "FailCount", "State");
+
+    foreach my $job (sort { $a->{guest} <=> $b->{guest} } @$list) {
+	my $plugin = PVE::ReplicationConfig->lookup($job->{type});
+	my $tid = $plugin->get_unique_target_id($job);
+
+	my $timestr = $job->{last_sync} ?
+	    strftime("%Y-%m-%d_%H:%M:%S", localtime($job->{last_sync})) : '-';
+
+	printf($format, $job->{id}, $job->{guest}, $tid,
+	       $timestr, $job->{duration} // '-',
+	       $job->{fail_count}, , $job->{error} // 'OK');
+    }
+};
+
+our $cmddef = {
+    status => [ 'PVE::API2::Replication', 'status', [], { node => $nodename }, $print_job_status ],
+
+    jobs => [ 'PVE::API2::ReplicationConfig', 'index' , [], {}, $print_job_list ],
+    read => [ 'PVE::API2::ReplicationConfig', 'read' , ['id'], {},
+	     sub { my $res = shift; print to_json($res, { utf8 => 1, pretty => 1, canonical => 1}); }],
+    update => [ 'PVE::API2::ReplicationConfig', 'update' , ['id'], {} ],
+    delete => [ 'PVE::API2::ReplicationConfig', 'delete' , ['id'], {} ],
+    'create-local-job' => [ 'PVE::API2::ReplicationConfig', 'create' , ['id', 'guest', 'target'],
+			    { type => 'local' } ],
+
+    enable => [ __PACKAGE__, 'enable', ['id'], {}],
+    disable => [ __PACKAGE__, 'disable', ['id'], {}],
+
+    run => [ __PACKAGE__ , 'run'],
+};
+
+1;
diff --git a/PVE/Makefile b/PVE/Makefile
index ac230fbf..05086629 100644
--- a/PVE/Makefile
+++ b/PVE/Makefile
@@ -3,6 +3,7 @@ include ../defines.mk
 SUBDIRS=API2 Status CLI Service
 
 PERLSOURCE = 			\
+	Replication.pm		\
 	API2.pm			\
 	API2Tools.pm		\
 	HTTPServer.pm		\
diff --git a/PVE/Replication.pm b/PVE/Replication.pm
new file mode 100644
index 00000000..1becbcfc
--- /dev/null
+++ b/PVE/Replication.pm
@@ -0,0 +1,242 @@
+package PVE::Replication;
+
+use warnings;
+use strict;
+use Data::Dumper;
+use JSON;
+use Time::HiRes qw(gettimeofday tv_interval);
+
+use PVE::INotify;
+use PVE::Tools;
+use PVE::Cluster;
+use PVE::QemuConfig;
+use PVE::QemuServer;
+use PVE::LXC::Config;
+use PVE::LXC;
+use PVE::Storage;
+use PVE::ReplicationConfig;
+
+# Note: regression tests can overwrite $state_path for testing
+our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
+
+my $update_job_state = sub {
+    my ($stateobj, $jobcfg, $state) = @_;
+
+    my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
+
+    my $vmid = $jobcfg->{guest};
+    my $tid = $plugin->get_unique_target_id($jobcfg);
+
+    # Note: tuple ($vmid, $tid) is unique
+    $stateobj->{$vmid}->{$tid} = $state;
+
+    PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
+};
+
+my $get_job_state = sub {
+    my ($stateobj, $jobcfg) = @_;
+
+    my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
+
+    my $vmid = $jobcfg->{guest};
+    my $tid = $plugin->get_unique_target_id($jobcfg);
+    my $state = $stateobj->{$vmid}->{$tid};
+
+    $state = {} if !$state;
+
+    $state->{last_iteration} //= 0;
+    $state->{last_sync} //= 0;
+    $state->{fail_count} //= 0;
+
+    return $state;
+};
+
+my $read_state = sub {
+
+    return {} if ! -e $state_path;
+
+    my $raw = PVE::Tools::file_get_contents($state_path);
+
+    return {} if $raw eq '';
+
+    return decode_json($raw);
+};
+
+sub job_status {
+    my ($stateobj) = @_;
+
+    my $local_node = PVE::INotify::nodename();
+
+    my $jobs = {};
+
+    $stateobj = $read_state->() if !$stateobj;
+
+    my $cfg = PVE::ReplicationConfig->new();
+
+    my $vms = PVE::Cluster::get_vmlist();
+
+    foreach my $jobid (sort keys %{$cfg->{ids}}) {
+	my $jobcfg = $cfg->{ids}->{$jobid};
+	my $vmid = $jobcfg->{guest};
+
+	die "internal error - not implemented" if $jobcfg->{type} ne 'local';
+
+	# skip non existing vms
+	next if !$vms->{ids}->{$vmid};
+
+	# only consider guest on local node
+	next if $vms->{ids}->{$vmid}->{node} ne $local_node;
+
+	# never sync to local node
+	next if $jobcfg->{target} eq $local_node;
+
+	next if $jobcfg->{disable};
+
+	$jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
+	$jobcfg->{id} = $jobid;
+	$jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
+
+	$jobs->{$jobid} = $jobcfg;
+    }
+
+    return $jobs;
+}
+
+my $get_next_job = sub {
+    my ($stateobj, $now) = @_;
+
+    my $next_jobid;
+
+    my $jobs = job_status($stateobj);
+
+    # compute next_sync here to make it easy to sort jobs
+    my $next_sync_hash = {};
+    foreach my $jobid (keys %$jobs) {
+	my $jobcfg = $jobs->{$jobid};
+	my $interval = $jobcfg->{interval} || 15;
+	my $last_sync = $jobcfg->{state}->{last_sync};
+	$next_sync_hash->{$jobid} = $last_sync + $interval * 60;
+    }
+
+    my $sort_func = sub {
+	my $joba = $jobs->{$a};
+	my $jobb = $jobs->{$b};
+	my $sa =  $joba->{state};
+	my $sb =  $jobb->{state};
+	my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
+	return $res if $res != 0;
+	$res = $next_sync_hash->{$a} <=> $next_sync_hash->{$b};
+	return $res if $res != 0;
+	return  $joba->{guest} <=> $jobb->{guest};
+    };
+
+    foreach my $jobid (sort $sort_func keys %$jobs) {
+	my $jobcfg = $jobs->{$jobid};
+	next if $jobcfg->{state}->{last_iteration} >= $now;
+	if ($now >= $next_sync_hash->{$jobid}) {
+	    $next_jobid = $jobid;
+	    last;
+	}
+    }
+
+    return undef if !$next_jobid;
+
+    my $jobcfg = $jobs->{$next_jobid};
+
+    $jobcfg->{state}->{last_iteration} = $now;
+    $update_job_state->($stateobj, $jobcfg,  $jobcfg->{state});
+
+    return $jobcfg;
+};
+
+sub replicate {
+    my ($jobcfg, $start_time) = @_;
+
+    die "implement me";
+}
+
+my $run_replication = sub {
+    my ($stateobj, $jobcfg, $start_time) = @_;
+
+    my $state = delete $jobcfg->{state};
+
+    my $t0 = [gettimeofday];
+
+    eval { replicate($jobcfg, $start_time); };
+    my $err = $@;
+
+    $state->{duration} = tv_interval($t0);
+
+    if ($err) {
+	$state->{fail_count}++;
+	$state->{error} = "$err";
+	$update_job_state->($stateobj, $jobcfg,  $state);
+   } else {
+	$state->{last_sync} = $start_time;
+	$state->{fail_count} = 0;
+	delete $state->{error};
+	$update_job_state->($stateobj, $jobcfg,  $state);
+    }
+};
+
+sub run_single_job {
+    my ($jobid, $now) = @_; # passing $now useful for regression testing
+
+    my $local_node = PVE::INotify::nodename();
+
+    my $code = sub {
+	$now //= time();
+
+	my $stateobj = $read_state->();
+
+	my $cfg = PVE::ReplicationConfig->new();
+
+	my $jobcfg = $cfg->{ids}->{$jobid};
+	die "no such job '$jobid'\n" if !$jobcfg;
+
+	die "internal error - not implemented" if $jobcfg->{type} ne 'local';
+
+	die "job '$jobid' is disabled\n" if $jobcfg->{disable};
+
+	my $vms = PVE::Cluster::get_vmlist();
+	my $vmid = $jobcfg->{guest};
+
+	die "no such guest '$vmid'\n" if !$vms->{ids}->{$vmid};
+
+	die "guest '$vmid' is not on local node\n"
+	    if $vms->{ids}->{$vmid}->{node} ne $local_node;
+
+	die "unable to sync to local node\n" if $jobcfg->{target} eq $local_node;
+
+	$jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
+	$jobcfg->{id} = $jobid;
+	$jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
+
+	$jobcfg->{state}->{last_iteration} = $now;
+	$update_job_state->($stateobj, $jobcfg,  $jobcfg->{state});
+
+	$run_replication->($stateobj, $jobcfg, $now);
+    };
+
+    my $res = PVE::Tools::lock_file($state_path, 60, $code);
+    die $@ if $@;
+}
+
+sub run_jobs {
+    my ($now) = @_; # passing $now useful for regression testing
+
+    my $code = sub {
+	my $stateobj = $read_state->();
+	my $start_time = $now // time();
+
+	while (my $jobcfg = $get_next_job->($stateobj, $start_time)) {
+	    $run_replication->($stateobj, $jobcfg, $start_time);
+	    $start_time = $now // time();
+	}
+    };
+
+    my $res = PVE::Tools::lock_file($state_path, 60, $code);
+    die $@ if $@;
+}
+
+1;
diff --git a/bin/Makefile b/bin/Makefile
index f9143eab..c7fca9f8 100644
--- a/bin/Makefile
+++ b/bin/Makefile
@@ -9,7 +9,7 @@ export PERLLIB=..
 SUBDIRS = init.d ocf test
 
 SERVICES = pvestatd pveproxy pvedaemon spiceproxy
-CLITOOLS = vzdump pvesubscription pveceph pveam
+CLITOOLS = vzdump pvesubscription pveceph pveam pvesr
 
 SCRIPTS =  			\
 	${SERVICES}		\
diff --git a/bin/pvesr b/bin/pvesr
new file mode 100644
index 00000000..762bb356
--- /dev/null
+++ b/bin/pvesr
@@ -0,0 +1,8 @@
+#!/usr/bin/perl -T
+
+use strict;
+use warnings;
+
+use PVE::CLI::pvesr;
+
+PVE::CLI::pvesr->run_cli_handler();
-- 
2.11.0




More information about the pve-devel mailing list