DZone Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

Snippets has posted 5883 posts at DZone. View Full User Profile

SolarSync

02.03.2006
| 1458 views |
  • submit to reddit
        // Mostly Dan's code.

#!/usr/bin/perl

#
# New and revised, started 24 March 2004
#

use DBI;
use IO::Handle;
use POSIX ":sys_wait_h";
use Thread;

our $debug = 0;
if ($ARGV[0] eq "--debug")
{
   $debug = 1;
}

######
# Cleanly exit SolarSync
######
$time_to_stop = "no";
sub stop_SolarSync
{
   print "Stopping SolarSync...\n";
   $time_to_stop = "yes";
}
# This catches Ctrl-C, etc.
$SIG{INT} = $SIG{TERM} = $SIG{HUP} = \&stop_SolarSync;

######
# Read Settings File
######
our $database = "solarsync";
our $host     = "localhost";
our $user     = "root";
our $password = "";
our %seconds = (
                 "1m"   => 60,
                 "5m"   => 60 * 5,
                 "10m"  => 60 * 10,
                 "30m"  => 60 * 30,
                 "1h"   => 60 * 60,
                 "6h"   => 60 * 60 * 6,
                 "12h"  => 60 * 60 * 12,
                 "1d"   => 60 * 60 * 24
               );
our $module_path = "modules/";

######
# Connect to the database, read settings
######
# Global database handler
my $dbh = DBI->connect("DBI:mysql:database=$database;host=$host",
                        "$user", "$password", {'RaiseError' => 1});
# Figure out how many SolarKiosks to use
my $sth = $dbh->prepare("SELECT sk_id, sk_name, sk_ip_address "
                      . "FROM solarkiosk WHERE active='yes'");
$sth->execute();
$NUM_SOLARKIOSKS = $sth->rows;

# Print how many SolarKiosks we found in the database
print $sth->rows, " SolarKiosks found.\n";

#####
# Create a queue and a thread for each SolarKiosk
#####
my @sk_threads;
for ($i = 0; $i < $NUM_SOLARKIOSKS; ++$i)
{
   $result = $sth->fetchrow_hashref();
   # remember to start a thread with the information from the query string above
   # $queue[$i]
   $sk_threads[$i] = Thread->new(\&SolarKioskThread,
                                 $result->{'sk_id'},
                                 $result->{'sk_name'},
                                 $result->{'sk_ip_address'});
   print "Started thread for SolarKiosk ",
         $result->{'sk_name'},
         " with ip address of ",
         $result->{'sk_ip_address'}, "\n";
}

$sth->finish();
$dbh->disconnect();

#####
# The thread for each SolarKiosk
#####
sub SolarKioskThread
{
   # get parameters from the powers that summoned our thread/subroutine
   my $sk_id = shift;
   my $sk_name = shift;
   my $sk_ip_address = shift;

   my @mutex  = (0,0,0); # mutual exclusion lock for our priority queues
   my @states = (-1, -1, -1); # keeps track of state changes for our priority queues
   my @pause  = (0,0,0); # shows if there are paused processes
   my @command_pid = (0,0,0); # index if PIDs of what is running where now

   my @queue; # multidimensional array to hold que data

   my $counter = 10; # keep track of time with this variable

   # Make a connection to the database for this thread
   my $dbh = DBI->connect("DBI:mysql:database=$database;host=$host",
                        "$user", "$password", {'RaiseError' => 1});

   while(1)
   {
      # Note: this code mostly comes from the algorithm developed by Daniel
      # Purcell, August 2003 in the first version of SolarSync.  If one can
      # find an easier and prettier scheduling algorithm, please implement it
      # here.

      # Note: this while loop will iterate about every five seconds.
      sleep (5);

      # waitpid stuff goes here
      $mutex[2] = waitpid ($command_pid[2], &WNOHANG);
      $mutex[1] = waitpid ($command_pid[1], &WNOHANG);
      $mutex[0] = waitpid ($command_pid[0], &WNOHANG);

      # Check for state changes in the upload
      for ($i = 0; $i <= $#mutex; ++$i)
      {
         if ($mutex[$i] != $states[$i])
         {
            # Close filehandles on the appropriate state change.
            if ($i == 2)
            {
               # DEBUG OUTPUT HERE--
               close CMD3;
            }
            if ($i == 1)
            {
               close CMD2;
            }
            elsif ($i == 0)
            {
               close CMD1;
            }
            $states[2] = $mutex[2];
         }
      }

      print "\tCHILD: Pri 3 in use: $mutex[2]\n" if $debug;
      print "\tCHILD: Pri 2 in use: $mutex[1]\n" if $debug;
      print "\tCHILD: Pri 1 in use: $mutex[0]\n" if $debug;

      ###############
      # The scheduler
      ##############
      # This is a very simple scheduler.  It has a counter, and checks if
      # something needs to run at what time, etc.

      ++$counter; # increment our time counter

      print $counter, "\n" if $debug;

      if ($counter > "120960") #120960 is how many iterations in a week
      {
         # reset our counter because we don't want to go way past a week's
         # worth yet.
         print "Reseting our counter to 0\n" if $debug;
         $counter = 0;
      }

      # Begin our "cron" process here
      SWITCH: # a perl-like switch statement
      {
         ($counter % 12 == 0)   && do # Every minute
                              {
               @result = enqueue($sk_id, "1m", $dbh);
               foreach $element (@result)
               {
                  my $pri = shift @{$element};
print "priority is: $pri\n" if $debug;
                  push @{ $queue[$pri] }, $element;
               }

               last SWITCH;
                              };
         ($counter % 60 == 0)   && do # Every five minutes
                              {
               foreach $element (enqueue($sk_id, "5m", $dbh))
               {
                  my $pri = shift @{$element};
                  push @{ $queue[$pri] }, $element;
               }
               last SWITCH;
                              };
         ($counter % 120 == 0)   && do # Every ten minutes
                              {
               foreach $element (enqueue($sk_id, "10m", $dbh))
               {
                  my $pri = shift @{$element};
                  push @{ $queue[$pri] }, $element;
               }
               last SWITCH;
                              };
         ($counter % 360 == 0)   && do # Every half hour
                              {
               foreach $element (enqueue($sk_id, "30m", $dbh))
               {
                  my $pri = shift @{$element};
                  push @{ $queue[$pri] }, $element;
               }
               last SWITCH;
                              };
         ($counter % 720 == 0)   && do # Every hour
                              {
               foreach $element (enqueue($sk_id, "1h", $dbh))
               {
                  my $pri = shift @{$element};
                  push @{ $queue[$pri] }, $element;
               }
               last SWITCH;
                              };
         ($counter % 4320 == 0)   && do # Every six hours
                              {
               foreach $element (enqueue($sk_id, "6h", $dbh))
               {
                  my $pri = shift @{$element};
                  push @{ $queue[$pri] }, $element;
               }
               last SWITCH;
                              };
         ($counter % 8640 == 0)   && do # Every twelve hours
                              {
               foreach $element (enqueue($sk_id, "12h", $dbh))
               {
                  my $pri = shift @{$element};
                  push @{ $queue[$pri] }, $element;
               }
               last SWITCH;
                              };
         ($counter % 17280 == 0)   && do # Every day
                              {
               foreach $element (enqueue($sk_id, "1d", $dbh))
               {
                  my $pri = shift @{$element};
                  push @{ $queue[$pri] }, $element;
               }
               last SWITCH;
                              };

               # to add more "cron" events, such as weekly or monthly,
               # do the similar thing here.  For example, take the number of
               # seconds there are in a week and devide by five
               # to get the modulus number needed above,
               # and add the appropriate lines in the settings files.
      }
      # END OF SCHEDULER

      # Process queue3, the highest priority queue.
      #
      # If we are not currently transfering a file, and if we have something
      # in the queue to process...
      if ($mutex[2] == -1 && $#{$queue[2]} >= 0)
      {
         # shift our command
         my $thisCmdRef = shift @{$queue[2]};
         # get cmd, expiration
         my $cmd         = $thisCmdRef->[0];
         my $expiration  = $thisCmdRef->[1];

         # cancel if expiration time has passed
         next if (time() > $expiration);

         # Pause other file transfers if they are in use and not paused
         if ($mutex[1] != -1 && $pause[1] != 1)
         {
            # suspend the process
            kill SIGSTOP => $command_pid[1];
            $pause[1] = 1;
         }
         if ($mutex[0] != -1 && $pause[0] != 1)
         {
            kill SIGSTOP => $command_pid[0];
            $pause[0] = 1;
         }

         # execute the command
         $command_pid[2] = open (CMD3, "$cmd |");
         print "$cmd", "\n" if $debug;
      }

      # If we are not transfering a file, put still have other processes in
      # pause mode, start them up again
      if ($mutex[2] == -1 && ($pause[1] || $pause[0]))
      {
         if ($pause[1])
         {
            kill SIGCONT => $command_pid[1];
            $pause[1] = 0;
         }
         if ($pause[0])
         {
            kill SIGCONT => $command_pid[0];
            $pause[0] = 0;
         }
      }
      # Process queue2, the medium priority queue.
      #
      # If we are not currently transfering a file, and if we have something
      # in the queue to process...
      if ($mutex[1] == -1 && $mutex[2] == -1 && $#{$queue[1]} >= 0)
      {
         my $thisCmdRef = shift @{$queue[1]};

         # get cmd, expiration
         my $cmd         = $thisCmdRef->[0];
         my $expiration  = $thisCmdRef->[1];

         # cancel if expiration time has passed
         next if (time() > $expiration);

         # Pause other file transfers if they are in use and not paused
         if ($mutex[0] != -1 && $pause[0] != 1)
         {
            kill SIGSTOP => $command_pid[0];
            $pause[0] = 1;
         }

         # execute the command
         $command_pid[1] = open (CMD2, "$cmd |");
         print "$cmd", "\n" if $debug;
      }

      # If we are not transfering a file, put still have other processes in
      # pause mode, start them up again
      if ($mutex[2] == -1 && $mutex[1] == -1 && $pause[0])
      {
         if ($pause[0])
         {
            kill SIGCONT => $command_pid[0];
            $pause[0] = 0;
         }
      }
      # Process queue[0], the lowest priority queue.
      #
      # If we are not currently transfering a file, and if we have something
      # in the queue to process...
      if ($mutex[0] == -1 && $mutex[1] == -1 && $mutex[2] == -1 &&
         $#{$queue[0]} >= 0)
      {
         my $thisCmdRef = shift @{$queue[0]};

         # get cmd, expiration
         my $cmd         = $thisCmdRef->[0];
         my $expiration  = $thisCmdRef->[1];

         # cancel if expiration time has passed
         next if (time() > $expiration);

         # execute the command
         $command_pid[0] = open (CMD1, "$cmd |");
         print "$cmd", "\n" if $debug;
      }

   }
   $dbh->disconnect();
}

while ($time_to_stop ne "yes")
{
   # do nothing until we have to stop
}

##########
# sub enqueue, reads settings and queues the values to run for the solar
# kiosk
##########
sub enqueue
{
   my $sk_id = shift;
   my $interval = shift;
   my $dbh = shift;
   my @returnArray;
   my $action_sth = $dbh->prepare("SELECT * FROM `actions` WHERE "
      . "`sk_id`='$sk_id' "
      . "AND `action_interval`='$interval' ORDER BY `order`");
   $action_sth->execute();

   while ($actionHash = $action_sth->fetchrow_hashref)
   {
      my $module_sth = $dbh->prepare("SELECT `mod_command`, `mod_rsync_after` "
         . "FROM `modules` WHERE `mod_id`='" . $actionHash->{"mod_id"} . "'");
      $module_sth->execute();

      my $moduleHash = $module_sth->fetchrow_hashref;

      my $command = $module_path . $moduleHash->{'mod_command'};

      $command =~ s/=arg1/$actionHash->{'arg1'}/g;
      $command =~ s/=arg2/$actionHash->{'arg2'}/g;
      $command =~ s/=arg3/$actionHash->{'arg3'}/g;
      $command =~ s/=arg4/$actionHash->{'arg4'}/g;
      $command =~ s/=arg5/$actionHash->{'arg5'}/g;

      if ($command =~ /=lcontent/)
      {
         my $sk_sth = $dbh->prepare("SELECT `sk_local_content_dir` FROM `solarkiosk` "
            . "WHERE `sk_id`='$sk_id'");
         $sk_sth->execute();
         my $skHash = $sk_sth->fetchrow_hashref;
         $command =~ s/=lcontent/$skHash->{'sk_local_content_dir'}/g;
      }

      if ($moduleHash->{'mod_rsync_after'} == 1)
      {
         my $sk_sth = $dbh->prepare("SELECT `sk_ip_address`, `sk_local_content_dir`, "
            . "`sk_remote_content_dir`, `rsync_cmd` "
            . "FROM `solarkiosk` WHERE `sk_id`='" . $sk_id . "'");
         $sk_sth->execute();
         my $skHash = $sk_sth->fetchrow_hashref;
         my $rsyc_cmd = $skHash->{'rsync_cmd'};
         $rsyc_cmd =~ s/=ip_address/$skHash->{'sk_ip_address'}/g;
         $rsyc_cmd =~ s/=lcontent/$skHash->{'sk_local_content_dir'}/g;
         $rsyc_cmd =~ s/=rcontent/$skHash->{'sk_remote_content_dir'}/g;
         $command = $command . "; " . $rsyc_cmd;
      }


      # return (priority, exec_command, exp_time)
if ($debug)
{
   print "Priority is: ", $actionHash->{'priority'}, "\n";
   print "Command is: ", $command, "\n";
}

      push @returnArray, [ ($actionHash->{'priority'}, $command, time +
         $seconds{"$interval"}) ];
   }
   return @returnArray;
}