DZone Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world
SolarSync
// Mostly Dan's code.
#!/usr/bin/perl
#
# New and revised, started 24 March 2004
#
use DBI;
use IO::Handle;
use POSIX ":sys_wait_h";
use Thread;
our $debug = 0;
if ($ARGV[0] eq "--debug")
{
$debug = 1;
}
######
# Cleanly exit SolarSync
######
$time_to_stop = "no";
sub stop_SolarSync
{
print "Stopping SolarSync...\n";
$time_to_stop = "yes";
}
# This catches Ctrl-C, etc.
$SIG{INT} = $SIG{TERM} = $SIG{HUP} = \&stop_SolarSync;
######
# Read Settings File
######
our $database = "solarsync";
our $host = "localhost";
our $user = "root";
our $password = "";
our %seconds = (
"1m" => 60,
"5m" => 60 * 5,
"10m" => 60 * 10,
"30m" => 60 * 30,
"1h" => 60 * 60,
"6h" => 60 * 60 * 6,
"12h" => 60 * 60 * 12,
"1d" => 60 * 60 * 24
);
our $module_path = "modules/";
######
# Connect to the database, read settings
######
# Global database handler
my $dbh = DBI->connect("DBI:mysql:database=$database;host=$host",
"$user", "$password", {'RaiseError' => 1});
# Figure out how many SolarKiosks to use
my $sth = $dbh->prepare("SELECT sk_id, sk_name, sk_ip_address "
. "FROM solarkiosk WHERE active='yes'");
$sth->execute();
$NUM_SOLARKIOSKS = $sth->rows;
# Print how many SolarKiosks we found in the database
print $sth->rows, " SolarKiosks found.\n";
#####
# Create a queue and a thread for each SolarKiosk
#####
my @sk_threads;
for ($i = 0; $i < $NUM_SOLARKIOSKS; ++$i)
{
$result = $sth->fetchrow_hashref();
# remember to start a thread with the information from the query string above
# $queue[$i]
$sk_threads[$i] = Thread->new(\&SolarKioskThread,
$result->{'sk_id'},
$result->{'sk_name'},
$result->{'sk_ip_address'});
print "Started thread for SolarKiosk ",
$result->{'sk_name'},
" with ip address of ",
$result->{'sk_ip_address'}, "\n";
}
$sth->finish();
$dbh->disconnect();
#####
# The thread for each SolarKiosk
#####
sub SolarKioskThread
{
# get parameters from the powers that summoned our thread/subroutine
my $sk_id = shift;
my $sk_name = shift;
my $sk_ip_address = shift;
my @mutex = (0,0,0); # mutual exclusion lock for our priority queues
my @states = (-1, -1, -1); # keeps track of state changes for our priority queues
my @pause = (0,0,0); # shows if there are paused processes
my @command_pid = (0,0,0); # index if PIDs of what is running where now
my @queue; # multidimensional array to hold que data
my $counter = 10; # keep track of time with this variable
# Make a connection to the database for this thread
my $dbh = DBI->connect("DBI:mysql:database=$database;host=$host",
"$user", "$password", {'RaiseError' => 1});
while(1)
{
# Note: this code mostly comes from the algorithm developed by Daniel
# Purcell, August 2003 in the first version of SolarSync. If one can
# find an easier and prettier scheduling algorithm, please implement it
# here.
# Note: this while loop will iterate about every five seconds.
sleep (5);
# waitpid stuff goes here
$mutex[2] = waitpid ($command_pid[2], &WNOHANG);
$mutex[1] = waitpid ($command_pid[1], &WNOHANG);
$mutex[0] = waitpid ($command_pid[0], &WNOHANG);
# Check for state changes in the upload
for ($i = 0; $i <= $#mutex; ++$i)
{
if ($mutex[$i] != $states[$i])
{
# Close filehandles on the appropriate state change.
if ($i == 2)
{
# DEBUG OUTPUT HERE--
close CMD3;
}
if ($i == 1)
{
close CMD2;
}
elsif ($i == 0)
{
close CMD1;
}
$states[2] = $mutex[2];
}
}
print "\tCHILD: Pri 3 in use: $mutex[2]\n" if $debug;
print "\tCHILD: Pri 2 in use: $mutex[1]\n" if $debug;
print "\tCHILD: Pri 1 in use: $mutex[0]\n" if $debug;
###############
# The scheduler
##############
# This is a very simple scheduler. It has a counter, and checks if
# something needs to run at what time, etc.
++$counter; # increment our time counter
print $counter, "\n" if $debug;
if ($counter > "120960") #120960 is how many iterations in a week
{
# reset our counter because we don't want to go way past a week's
# worth yet.
print "Reseting our counter to 0\n" if $debug;
$counter = 0;
}
# Begin our "cron" process here
SWITCH: # a perl-like switch statement
{
($counter % 12 == 0) && do # Every minute
{
@result = enqueue($sk_id, "1m", $dbh);
foreach $element (@result)
{
my $pri = shift @{$element};
print "priority is: $pri\n" if $debug;
push @{ $queue[$pri] }, $element;
}
last SWITCH;
};
($counter % 60 == 0) && do # Every five minutes
{
foreach $element (enqueue($sk_id, "5m", $dbh))
{
my $pri = shift @{$element};
push @{ $queue[$pri] }, $element;
}
last SWITCH;
};
($counter % 120 == 0) && do # Every ten minutes
{
foreach $element (enqueue($sk_id, "10m", $dbh))
{
my $pri = shift @{$element};
push @{ $queue[$pri] }, $element;
}
last SWITCH;
};
($counter % 360 == 0) && do # Every half hour
{
foreach $element (enqueue($sk_id, "30m", $dbh))
{
my $pri = shift @{$element};
push @{ $queue[$pri] }, $element;
}
last SWITCH;
};
($counter % 720 == 0) && do # Every hour
{
foreach $element (enqueue($sk_id, "1h", $dbh))
{
my $pri = shift @{$element};
push @{ $queue[$pri] }, $element;
}
last SWITCH;
};
($counter % 4320 == 0) && do # Every six hours
{
foreach $element (enqueue($sk_id, "6h", $dbh))
{
my $pri = shift @{$element};
push @{ $queue[$pri] }, $element;
}
last SWITCH;
};
($counter % 8640 == 0) && do # Every twelve hours
{
foreach $element (enqueue($sk_id, "12h", $dbh))
{
my $pri = shift @{$element};
push @{ $queue[$pri] }, $element;
}
last SWITCH;
};
($counter % 17280 == 0) && do # Every day
{
foreach $element (enqueue($sk_id, "1d", $dbh))
{
my $pri = shift @{$element};
push @{ $queue[$pri] }, $element;
}
last SWITCH;
};
# to add more "cron" events, such as weekly or monthly,
# do the similar thing here. For example, take the number of
# seconds there are in a week and devide by five
# to get the modulus number needed above,
# and add the appropriate lines in the settings files.
}
# END OF SCHEDULER
# Process queue3, the highest priority queue.
#
# If we are not currently transfering a file, and if we have something
# in the queue to process...
if ($mutex[2] == -1 && $#{$queue[2]} >= 0)
{
# shift our command
my $thisCmdRef = shift @{$queue[2]};
# get cmd, expiration
my $cmd = $thisCmdRef->[0];
my $expiration = $thisCmdRef->[1];
# cancel if expiration time has passed
next if (time() > $expiration);
# Pause other file transfers if they are in use and not paused
if ($mutex[1] != -1 && $pause[1] != 1)
{
# suspend the process
kill SIGSTOP => $command_pid[1];
$pause[1] = 1;
}
if ($mutex[0] != -1 && $pause[0] != 1)
{
kill SIGSTOP => $command_pid[0];
$pause[0] = 1;
}
# execute the command
$command_pid[2] = open (CMD3, "$cmd |");
print "$cmd", "\n" if $debug;
}
# If we are not transfering a file, put still have other processes in
# pause mode, start them up again
if ($mutex[2] == -1 && ($pause[1] || $pause[0]))
{
if ($pause[1])
{
kill SIGCONT => $command_pid[1];
$pause[1] = 0;
}
if ($pause[0])
{
kill SIGCONT => $command_pid[0];
$pause[0] = 0;
}
}
# Process queue2, the medium priority queue.
#
# If we are not currently transfering a file, and if we have something
# in the queue to process...
if ($mutex[1] == -1 && $mutex[2] == -1 && $#{$queue[1]} >= 0)
{
my $thisCmdRef = shift @{$queue[1]};
# get cmd, expiration
my $cmd = $thisCmdRef->[0];
my $expiration = $thisCmdRef->[1];
# cancel if expiration time has passed
next if (time() > $expiration);
# Pause other file transfers if they are in use and not paused
if ($mutex[0] != -1 && $pause[0] != 1)
{
kill SIGSTOP => $command_pid[0];
$pause[0] = 1;
}
# execute the command
$command_pid[1] = open (CMD2, "$cmd |");
print "$cmd", "\n" if $debug;
}
# If we are not transfering a file, put still have other processes in
# pause mode, start them up again
if ($mutex[2] == -1 && $mutex[1] == -1 && $pause[0])
{
if ($pause[0])
{
kill SIGCONT => $command_pid[0];
$pause[0] = 0;
}
}
# Process queue[0], the lowest priority queue.
#
# If we are not currently transfering a file, and if we have something
# in the queue to process...
if ($mutex[0] == -1 && $mutex[1] == -1 && $mutex[2] == -1 &&
$#{$queue[0]} >= 0)
{
my $thisCmdRef = shift @{$queue[0]};
# get cmd, expiration
my $cmd = $thisCmdRef->[0];
my $expiration = $thisCmdRef->[1];
# cancel if expiration time has passed
next if (time() > $expiration);
# execute the command
$command_pid[0] = open (CMD1, "$cmd |");
print "$cmd", "\n" if $debug;
}
}
$dbh->disconnect();
}
while ($time_to_stop ne "yes")
{
# do nothing until we have to stop
}
##########
# sub enqueue, reads settings and queues the values to run for the solar
# kiosk
##########
sub enqueue
{
my $sk_id = shift;
my $interval = shift;
my $dbh = shift;
my @returnArray;
my $action_sth = $dbh->prepare("SELECT * FROM `actions` WHERE "
. "`sk_id`='$sk_id' "
. "AND `action_interval`='$interval' ORDER BY `order`");
$action_sth->execute();
while ($actionHash = $action_sth->fetchrow_hashref)
{
my $module_sth = $dbh->prepare("SELECT `mod_command`, `mod_rsync_after` "
. "FROM `modules` WHERE `mod_id`='" . $actionHash->{"mod_id"} . "'");
$module_sth->execute();
my $moduleHash = $module_sth->fetchrow_hashref;
my $command = $module_path . $moduleHash->{'mod_command'};
$command =~ s/=arg1/$actionHash->{'arg1'}/g;
$command =~ s/=arg2/$actionHash->{'arg2'}/g;
$command =~ s/=arg3/$actionHash->{'arg3'}/g;
$command =~ s/=arg4/$actionHash->{'arg4'}/g;
$command =~ s/=arg5/$actionHash->{'arg5'}/g;
if ($command =~ /=lcontent/)
{
my $sk_sth = $dbh->prepare("SELECT `sk_local_content_dir` FROM `solarkiosk` "
. "WHERE `sk_id`='$sk_id'");
$sk_sth->execute();
my $skHash = $sk_sth->fetchrow_hashref;
$command =~ s/=lcontent/$skHash->{'sk_local_content_dir'}/g;
}
if ($moduleHash->{'mod_rsync_after'} == 1)
{
my $sk_sth = $dbh->prepare("SELECT `sk_ip_address`, `sk_local_content_dir`, "
. "`sk_remote_content_dir`, `rsync_cmd` "
. "FROM `solarkiosk` WHERE `sk_id`='" . $sk_id . "'");
$sk_sth->execute();
my $skHash = $sk_sth->fetchrow_hashref;
my $rsyc_cmd = $skHash->{'rsync_cmd'};
$rsyc_cmd =~ s/=ip_address/$skHash->{'sk_ip_address'}/g;
$rsyc_cmd =~ s/=lcontent/$skHash->{'sk_local_content_dir'}/g;
$rsyc_cmd =~ s/=rcontent/$skHash->{'sk_remote_content_dir'}/g;
$command = $command . "; " . $rsyc_cmd;
}
# return (priority, exec_command, exp_time)
if ($debug)
{
print "Priority is: ", $actionHash->{'priority'}, "\n";
print "Command is: ", $command, "\n";
}
push @returnArray, [ ($actionHash->{'priority'}, $command, time +
$seconds{"$interval"}) ];
}
return @returnArray;
}





