DZone Snippets is a public source code repository. Easily build up your personal collection of code snippets, categorize them with tags / keywords, and share them with the world

Snippets has posted 5883 posts at DZone. View Full User Profile

Synchronize The Clocks Of Distributed Java Clients On A LAN

05.14.2010
| 3482 views |
  • submit to reddit
        Distributed UDP/Multicast class for figuring out the clock differences on a LAN
Replace System.currentTimeMillis() with ClockSync.getAdjustedTime() which will gradually adapt once startSync() is called
Should work with any number of LAN clients joining/leaving.

No server, totally distributed.  Would love ideas on how to improve it!

/**
 * @author Benjamin Hill
 */
public class ClockSync {

  /** 
  * The reason for this to exist.  Use instead of System.currentTimeMillis()
  */
  public static long getAdjustedTime() {
    return System.currentTimeMillis() + timerOffset;
  }


  /**
   * Sends out a timestamped multicast "ping"
   */
  final class BroadcastTask extends TimerTask {

    @Override
    public void run() {
      ClockSync.this.bout.putShort(PACKET_CLASS);
      ClockSync.this.bout.putInt(ClockSync.this.myId);
      ClockSync.this.bout.putLong(ClockSync.getAdjustedTime());
      final DatagramPacket dp = new DatagramPacket(ClockSync.this.bout.array(), ClockSync.this.bout.position(), ClockSync.this.group, UPNP_MULTI_PORT);
      try {
        ClockSync.this.ms.send(dp);
        ClockSync.this.bout.clear();
      } catch (final IOException ex) {
        logger.log(Level.SEVERE, null, ex);
      }
      logger.finer("Sent ClockSync ping");
    }
  }

  /**
   * Blocking thread for getting timer packets
   */
  final class ReceiveThread implements Runnable {

    boolean running = false;

    @Override
    public void run() {
      this.running = true;
      while (this.running) {
        try {
          ClockSync.this.bin.clear();
          final DatagramPacket dp = new DatagramPacket(ClockSync.this.bin.array(), ClockSync.this.bin.capacity());
          logger.finest("Waiting for next packet");
          ClockSync.this.ms.receive(dp);  // BLOCK HERE
          ClockSync.this.bin.rewind();
          final long now = ClockSync.getAdjustedTime();
          final short pclass = ClockSync.this.bin.getShort();
          if (PACKET_CLASS != pclass) {
            logger.finest("Random packet, skipping");
            continue;
          }

          final int id = ClockSync.this.bin.getInt();
          if (id == ClockSync.this.myId) {
            logger.finest("My own packet, skipping");
            continue;
          }

          final long ts = ClockSync.this.bin.getLong();
          if (now >= ts) {
            logger.finest("Other peer is behind me, skipping");
            continue;
          }

          logger.log(Level.FINER, "Got Relevent Packet:{0} {1} {2}", new Object[]{pclass, id, ts});
          final long ahead = ts - now;
          timerOffset += (ahead >> 1);
          logger.log(Level.INFO, "Other peer is {0} ahead of me, catching up halfway with a new offset of {1}", new Object[]{ahead, timerOffset});
        } catch (final IOException ex) {
          logger.severe(ex.getLocalizedMessage());
          this.running = false;
        }
      }
    }
  }
  protected static volatile long timerOffset = 0;
  static final Logger logger = Logger.getLogger(ClockSync.class.getName());
  final static short PACKET_CLASS = 2700; // random
  static final Random r = new Random();
  static final String UPNP_ADDRESS = "239.255.255.250";
  static final int UPNP_MULTI_PORT = 1900;

  final ByteBuffer bin, bout;
  final int BUFF_SIZE = Short.SIZE + Integer.SIZE + Long.SIZE; // Packet class + machine ID + Time
  final InetAddress group;
  final MulticastSocket ms;
  final int myId;
  final ReceiveThread rc = new ReceiveThread();
  final Thread recThread = new Thread(rc);
  final Timer broadTask = new Timer();

  public ClockSync() throws IOException {
    this.ms = new MulticastSocket(null);
    this.group = InetAddress.getByName(UPNP_ADDRESS);
    this.ms.setTimeToLive(4);
    this.ms.setSoTimeout(0);
    this.ms.setLoopbackMode(true);
    this.ms.setReuseAddress(true);
    if (!this.ms.getReuseAddress()) {
      logger.warning("MS Socket can't reuse address");
    }
    this.ms.bind(new InetSocketAddress(InetAddress.getByName("0.0.0.0"), UPNP_MULTI_PORT));
    this.ms.joinGroup(this.group);

    this.bin = ByteBuffer.allocate(this.BUFF_SIZE);
    this.bout = ByteBuffer.allocate(this.BUFF_SIZE);

    this.myId = r.nextInt();
  }

  /**
   * Don't forget to run this!
   */
  public void startSync() {
    this.recThread.start();
    this.broadTask.schedule(new BroadcastTask(), 0, 5000);
  }

  /**
   * Don't bother stopping it if you are ok with the overhead, in case new clients join
   */
  public void stopSync() {
    this.rc.running = false;
    this.broadTask.cancel();
  }

  @Override
  protected void finalize() throws Throwable {
    super.finalize();
    if (!this.ms.isClosed()) {
      if (this.ms.isBound() && !this.ms.isClosed()) {
        this.ms.leaveGroup(this.group);
        this.ms.close();
      }
    }
  }
}