#!/usr/bin/perl -w
use strict;

use Socket;
use POE qw(Wheel::SocketFactory Wheel::ReadWrite Filter::Stream);
use POSIX qw(EPIPE ENOTCONN);
use Time::HiRes qw(gettimeofday);
use Getopt::Long;

use constant SHUT_READ  => 1;
use constant SHUT_WRITE => 2;

my $VERSION = "0.01";
my $FILE_VERSION = "1";

my @stamp_name = qw(DateUT Offset);
unshift @stamp_name, "";
my $out_stamp_type = 2;

my ($help, $version, $unsafe, $verbose, $quiet, $in_stamp_type);
my ($input_file, $input_fh, $input_version, $output_file, $output_fh);
my ($start_seconds, $start_microseconds);
&Getopt::Long::config("bundling", "require_order");
die "Could not parse your command line" unless
    GetOptions("o=s"		=> \$output_file,
               "i=s"		=> \$input_file,
               "quiet!"		=> \$quiet,
               "q"		=> \$quiet,
               "verbose!"	=> \$verbose,	# Say what you are doing
               "v"		=> \$verbose,
               "version!"	=> \$version,
               "help!"		=> \$help,
               "h"		=> \$help,
               "unsafe!"	=> \$unsafe,
               "U"		=> \$unsafe);

if ($version) {
    print<<"EOF";
ReplayProxy (Ton Utils) $VERSION
EOF
    exit 0;
}
if ($help) {
    require Config;
    $ENV{PATH} .= ":" unless $ENV{PATH} eq "";
    $ENV{PATH} = "$ENV{PATH}$Config::Config{'installscript'}";
    exec("perldoc", "-F", $unsafe ? "-U" : (), $0) || exit 1;
    # make parser happy
    %Config::Config = ();
}
if ($input_file) {
    open($input_fh, "<", $input_file) ||
        die "Could not open input file $input_file: $!\n";
    initial_input($input_fh);
}
if ($output_file) {
    open($output_fh, ">", $output_file) ||
        die "Could not create output file $output_file: $!\n";
    ($start_seconds, $start_microseconds) = gettimeofday;
    print $output_fh "ReplayProxy: Version $FILE_VERSION\nStamp Type: $stamp_name[$out_stamp_type]\nStarted: ", cur_time($start_seconds, $start_microseconds),"\n";
}

package Listener;
my %listeners;
my $listener_id;
sub new {
    my $listener = bless {}, shift;
    $listeners{++$listener_id} = $listener;
    $listener->{id} = $listener_id;
    $listener->{copiers} = {};
    return $listener;
}

sub new_copier {
    my $listener = shift;
    my $copier = Copier->new($listener->{id});
    $listener->{copiers}{$copier} = $copier;
    return $copier;
}

sub drop {
    my ($class, $copier, $listener_id) = @_;
    my $listener = $listeners{$listener_id} ||
        die "Could not find listener $listener_id";
    delete $listener->{copiers}{$copier};
}

sub print {
    my ($listener, $wheel, $indent) = @_;
    my $copiers = keys %{$listener->{copiers}};
    $wheel->put(sprintf("%sListener %d %s:%s(%s:%d)->%s:%s(%s) has %d copier%s%s",
                        $indent, $listener->{id},
                        $listener->{from_host}, $listener->{from_port},
                        $listener->{from_address}, $listener->{listen_port},
                        $listener->{to_host},   $listener->{to_port},
                        $listener->{to_address}, $copiers,
                        $copiers == 1 ? "" : "s", $copiers ? ":" : ""));
    $indent .= "  ";
    $listener->{copiers}{$_}->print($wheel, $indent) for
        keys %{$listener->{copiers}};
}

sub free {
    my $listener = shift;
    delete $listeners{$listener->{id}} ||
        die "Destroying listener $listener->{id} not in list";
}

package Copier;
my %copiers;
my $copier_id;
sub new {
    my ($class, $listener_id) = @_;
    my $copier = bless {}, $class;
    $copiers{++$copier_id} = $copier;
    $copier->{id} = $copier_id;
    $copier->{listener_id} = $listener_id;
    return $copier;
}

sub print {
    my ($copier, $wheel, $indent) = @_;
    $wheel->put(sprintf("%sCopier %s: %s -> %s", $indent, $copier->{id},
                        $copier->{client_remote}, $copier->{server_remote}));
}

sub free {
    my $copier = shift;
    Listener->drop($copier, $copier->{listener_id});
    delete $copiers{$copier->{id}} ||
        die "Destroying copier $copier->{id} not in list";
}

package main;

sub initial_input {
    my $input_fh = shift;

    local $_;
    eval {
        while (<$input_fh>) {
            s/^\s*([^:]*[^:\s])\s*:\s*// || die "Could not parse\n";
            my $command = uc($1);
            $command =~ s/\s+/ /g;
            s/\s*$//;
            if ($command eq "STAMP TYPE") {
                for my $type (1..$#stamp_name) {
                    next unless /^\Q$stamp_name[$type]\E$/i;
                    $in_stamp_type = $type;
                    last;
                }
                $in_stamp_type || die "Unknown Stamp Type $_\n";
            } elsif ($command eq "REPLAYPROXY") {
                ($input_version) = /^Version\s+(\d+(?:.\d+)?)/i or
                    die "Could not parse as a version\n";
                $input_version == $FILE_VERSION ||
                    die "ReplayProxy file format $input_version not supported (I know about $FILE_VERSION)\n";
            } elsif ($command eq "STARTED") {
                my ($year, $mon, $day, $hour, $min, $usec) =
                    m!^(\d+)/(\d+)/(\d+)\s+(\d+):(\d+):(\d+(?:\.\d+)?)\s+UT$!i
                    or die "Could not parse as a start time\n";
                my $sec = int $usec;
                $usec = 1e6*($usec-$sec);

                $in_stamp_type || die "No Stamp Type $_\n";
                return;
            } else {
                die "Unknown command $command\n";
            }
        }
        die "Unexpected EOF on $input_file\n";
    };
    die "File $input_file line $.: $@" if $@;
}

sub default {
    die("calling non existant event ", $_[ARG0]) unless
        substr($_[ARG0], 0, 1) eq "_";
    return;
}

sub cur_time {
    my ($seconds, $microseconds) = @_ ? @_ : gettimeofday;
    my ($sec, $min, $hour,$mday,$mon,$year) = gmtime($seconds);
    return sprintf("%04d/%02d/%02d %02d:%02d:%02d.%03d UT",
                   $year+1900,$mon+1,$mday,$hour,$min,$sec,
                   $microseconds/1000+0.5);
}

sub stamp {
    my ($seconds, $microseconds) = gettimeofday;
    if ($out_stamp_type == 1) {
        my ($sec, $min, $hour,$mday,$mon,$year) = gmtime($seconds);
        return sprintf("%04d/%02d/%02d %02d:%02d:%02d.%03d",
                       $year+1900,$mon+1,$mday,$hour,$min,$sec,
                       $microseconds/1000+0.5);
    }
    if ($out_stamp_type == 2) {
        $seconds -= $start_seconds;
        $microseconds -= $start_microseconds;
        if ($microseconds < 0) {
            $microseconds += 1e6;
            $seconds--;
        }
        return sprintf("%d.%03d", $seconds, $microseconds/1000+0.5);
    }
    die "Unknown stamp_type $out_stamp_type\n";
}

sub start_listen {
    my ($listener, $arg) = @_[HEAP, ARG0];
    my ($from, $to) = $arg =~ m!^(?:([^/]*)/)?([^/]*)$! or
        die "Could not parse listen specification '$arg'\n";
    $from = "" unless defined $from;
    my ($from_host, $from_port) =
        $from =~ /^(?:((?=[^:]*\D)[\w.-]*)(?:$|:))?(\d*)$/ or
        die "Could not parse source socket specification '$from'\n";

    $listener->{from_host} =
        defined($from_host) && $from_host ne "" ? $from_host : "localhost";
    $listener->{from_address} = inet_ntoa(inet_aton($listener->{from_host}) ||
        die "Could not resolve '$listener->{from_host}'\n");
    $listener->{from_port} = $from_port || 0;

    my ($to_host, $to_port) = $to =~ /^(?:((?=[^:]*\D)[\w.-]*)(?:$|:))?(\d*)$/ or
        die "Could not parse target socket specification '$to'\n";
    $listener->{to_host} =
        defined($to_host) && $to_host ne "" ? $to_host : "localhost";
    $listener->{to_address} = inet_ntoa(inet_aton($listener->{to_host}) ||
        die "Could not resolve '$listener->{to_host}'\n");
    $listener->{to_port} = $to_port || 0;

    print $output_fh stamp(), " Listener $listener->{id}: $listener->{from_host}:$listener->{from_address}:$listener->{from_port} $listener->{to_host}:$listener->{to_address}:$listener->{to_port}\n" if $output_fh;

    $listener->{wheel} = POE::Wheel::SocketFactory->new
        (BindAddress	=> $listener->{from_address},
         BindPort	=> $listener->{from_port},
         SuccessEvent	=> "accepted",
         FailureEvent	=> "listener_failed");
    my $sockaddr = $listener->{wheel}->getsockname;
    my ($port, $addr) = sockaddr_in($sockaddr);
    $listener->{listen_port} = $port;
    printf STDERR "Listening on %s:%d\n", inet_ntoa($addr), $port if $port;
}

sub listener_failed {
    my ($listener, $operation, $message) = @_[HEAP, ARG0, ARG2];
    die "$operation failed for listener $listener->{from_host}:$listener->{from_port}: $message\n";
}

sub accepted {
    my ($listener, $socket, $addr, $port) = @_[HEAP, ARG0, ARG1, ARG2];
    $addr = inet_ntoa($addr) . ":" . $port;
    print STDERR "Accepted connection from $addr\n" if $verbose;
  POE::Session->create
      (inline_states => {
          _start	=> \&start_copy,
          _stop		=> sub {
              my $copier = $_[HEAP];
              print STDERR "Connection $copier->{id} closed\n" if $verbose;
          },
          _default	=> \&default,
          connected	=> \&connected,
          connection_failed => \&connection_failed,
          client_input	=> \&client_input,
          client_error	=> \&client_error,
          client_flush	=> \&client_flush,
          server_input	=> \&server_input,
          server_error	=> \&server_error,
          server_flush	=> \&server_flush,
      },
       heap => $listener->new_copier,
       args => [$listener, $socket, $addr]);
}

sub start_copy {
    my ($copier, $listener, $socket, $remote_addr) = @_[HEAP, ARG0..ARG2];
    my $sockaddr = getsockname($socket);
    my ($port, $addr) = sockaddr_in($sockaddr);
    $addr = inet_ntoa($addr);
    my $local_addr = "$addr:$port";

    print $output_fh stamp(), " Connect $copier->{id} on $listener->{id}: $remote_addr -> $listener->{from_host}:$listener->{from_port}($local_addr) -> $listener->{to_host}($listener->{to_address}):$listener->{to_port}\n" if $output_fh;
    $copier->{client_remote} = $remote_addr;
    $copier->{client_local}  = $local_addr;
    $copier->{client_socket} = $socket;
    $copier->{listener} = $listener;
    $copier->{wheel} = POE::Wheel::SocketFactory->new
        (RemoteAddress	=> $listener->{to_address},
         RemotePort	=> $listener->{to_port},
         SuccessEvent	=> "connected",
         FailureEvent	=> "connection_failed");
}

sub connection_failed {
    my ($copier, $operation, $message) = @_[HEAP, ARG0, ARG2];
    my $listener = $copier->{listener};
    print STDERR "$operation failed for connector $listener->{to_host}:$listener->{to_port}: $message\n" unless $quiet;
    print $output_fh stamp(), " Connect $copier->{id} failed: ($operation) $message\n" if
        $output_fh;
}

sub connected {
    my ($copier, $socket, $remote_addr, $remote_port) = @_[HEAP, ARG0..ARG2];
    $remote_addr = inet_ntoa($remote_addr) . ":" . $remote_port;
    my $sockaddr = getsockname($socket);
    my ($local_port, $local_addr) = sockaddr_in($sockaddr);
    $local_addr = inet_ntoa($local_addr) . ":" . $local_port;
    print STDERR "Connected from $local_addr\n" if $verbose;
    print $output_fh stamp(), " Connect $copier->{id} done\n" if $output_fh;
    $copier->{server_local}  = $local_addr;
    $copier->{server_remote} = $remote_addr;
    $copier->{server_socket} = $socket;
    delete $copier->{wheel};
    $copier->{server_shutdown} = $copier->{client_shutdown} = 0;
    $copier->{server_flush} = $copier->{client_flush} = 0;

    $copier->{client_wheel} = POE::Wheel::ReadWrite->new
        (Handle => $copier->{client_socket},
         Filter => POE::Filter::Stream->new,
         InputEvent => "client_input",
         ErrorEvent => "client_error");
    $copier->{server_wheel} = POE::Wheel::ReadWrite->new
        (Handle => $copier->{server_socket},
         Filter => POE::Filter::Stream->new,
         InputEvent => "server_input",
         ErrorEvent => "server_error");
}

sub client_input {
    my ($copier, $input) = @_[HEAP, ARG0];
    $copier->{server_wheel}->put($input);
    print $output_fh stamp(), " C $copier->{id} ", length($input), ":\n", $input, substr($input, -1, 1) eq "\n" ? "" : "\n" if $output_fh;
}

sub server_input {
    my ($copier, $input) = @_[HEAP, ARG0];
    $copier->{client_wheel}->put($input);
    print $output_fh stamp(), " S $copier->{id} ", length($input), ":\n", $input, substr($input, -1, 1) eq "\n" ? "" : "\n" if $output_fh;
}

sub server_flush {
    my $copier = $_[HEAP];
    $copier->{server_wheel}->event(FlushedEvent => undef);
    $copier->{server_flush} = 0;
    server_shut_write($copier);
}

sub client_flush {
    my $copier = $_[HEAP];
    $copier->{client_wheel}->event(FlushedEvent => undef);
    $copier->{client_flush} = 0;
    client_shut_write($copier);
}

# Server has stopped reading, nothing new will ever get queued to the client
# But we can have stuff pending !
sub client_nomore {
    my $copier = shift;
    return if $copier->{client_shutdown} & SHUT_WRITE;
    if ($copier->{client_wheel}->get_driver_out_octets) {
        # Stuff pending
        die "Already have a client flush pending" if $copier->{client_flush};
        $copier->{client_wheel}->event(FlushedEvent => "client_flush");
        $copier->{client_flush} = 1;
    } else {
        client_shut_write($copier);
    }
}

sub server_nomore {
    my $copier = shift;
    return if $copier->{server_shutdown} & SHUT_WRITE;
    if ($copier->{server_wheel}->get_driver_out_octets) {
        # Stuff pending
        die "Already have a server flush pending" if $copier->{server_flush};
        $copier->{server_wheel}->event(FlushedEvent => "server_flush");
        $copier->{server_flush} = 1;
    } else {
        server_shut_write($copier);
    }
}

sub client_shut_read {
    my $copier = shift;
    die "client already shutdown for read" if
        $copier->{client_shutdown} & SHUT_READ;
    if ($copier->{client_shutdown} == SHUT_WRITE) {
        client_close($copier);
    } elsif (shutdown($copier->{client_socket}, 0)) {
        $copier->{client_shutdown} |= SHUT_READ;
        server_nomore($copier);
    } else {
        die "Could not shutdown client: $!\n";
    }
}

sub server_shut_read {
    my $copier = shift;
    die "server already shutdown for read" if
        $copier->{server_shutdown} & SHUT_READ;
    if ($copier->{server_shutdown} == SHUT_WRITE) {
        server_close($copier);
    } elsif (shutdown($copier->{server_socket}, 0)) {
        $copier->{server_shutdown} |= SHUT_READ;
        client_nomore($copier);
    } else {
        die "Could not shutdown server: $!\n";
    }
}

sub client_shut_write {
    my $copier = shift;
    # Not writable anymore
    die "client already shutdown for write" if
        $copier->{client_shutdown} & SHUT_WRITE;
    if ($copier->{client_shutdown} == SHUT_READ) {
        client_close($copier);
    } elsif (shutdown($copier->{client_socket}, 1)) {
        $copier->{client_shutdown} |= SHUT_WRITE;
        server_shut_read($copier) unless
            $copier->{server_shutdown} & SHUT_READ;
    } else {
        die "Could not shutdown client: $!\n" unless $! == ENOTCONN;
        client_close($copier);
    }
}

sub server_shut_write {
    my $copier = shift;
    # Not writable anymore
    die "server already shutdown for write" if
        $copier->{server_shutdown} & SHUT_WRITE;
    if ($copier->{server_shutdown} == SHUT_READ) {
        server_close($copier);
    } elsif (shutdown($copier->{server_socket}, 1)) {
        $copier->{server_shutdown} |= SHUT_WRITE;
        client_shut_read($copier) unless
            $copier->{client_shutdown} & SHUT_READ;
    } else {
        die "Could not shutdown server: $!\n" unless $! == ENOTCONN;
        server_close($copier);
    }
}

sub client_error {
    my ($copier, $operation, $errno, $message) = @_[HEAP, ARG0..ARG2];

    print $output_fh stamp(), " ClientEnd $copier->{id} $operation:$errno:$message\n" if $output_fh;

    if ($errno == 0) {	# EOF
        client_shut_read($copier);
    } elsif ($errno == EPIPE) {
        client_shut_write($copier);
    } else {
        print STDERR "Client error: $operation: $message\n" if !$quiet;
        client_close($copier);
    }
}

sub server_error {
    my ($copier, $operation, $errno, $message) = @_[HEAP, ARG0..ARG2];

    print $output_fh stamp(), " ServerEnd $copier->{id} $operation:$errno:$message\n" if $output_fh;

    if ($errno == 0) {	# EOF
        server_shut_read($copier);
    } elsif ($errno == EPIPE) {
        server_shut_write($copier);
    } else {
        print STDERR "Server error: $operation: $message\n" if !$quiet;
        server_close($copier);
    }
}

sub client_close {
    my $copier = shift;
    print STDERR "client_close\n";
    my $shutdown = $copier->{client_shutdown};
    $copier->{client_shutdown} = SHUT_READ | SHUT_WRITE;
    # $copier->{client_flush} = 0;
    $copier->free if !$copier->{server_wheel};
    delete $copier->{client_wheel};
    delete $copier->{client_socket};
    server_nomore($copier)    unless $shutdown & SHUT_READ;
    server_shut_read($copier) unless
        $shutdown & SHUT_WRITE || $copier->{server_shutdown} & SHUT_READ;
}

sub server_close {
    my $copier = shift;
    print STDERR "server_close\n";
    my $shutdown = $copier->{server_shutdown};
    $copier->{server_shutdown} = SHUT_READ | SHUT_WRITE;
    # $copier->{server_flush} = 0;
    $copier->free if !$copier->{client_wheel};
    delete($copier->{server_wheel});
    delete $copier->{server_socket};
    client_nomore($copier)    unless $shutdown & SHUT_READ;
    client_shut_read($copier) unless
        $shutdown & SHUT_WRITE || $copier->{client_shutdown} & SHUT_READ;
}

sub start_command {
    my ($control, $in_socket, $out_socket, $must) = @_[HEAP, ARG0..ARG2];
    $control->{in_socket}  = $in_socket;
    $control->{out_socket} = $out_socket;
    $control->{must}	   = $must;
    $control->{wheel} = POE::Wheel::ReadWrite->new
        (InputHandle	=> $control->{in_socket},
         OutputHandle	=> $control->{out_socket},
         InputEvent	=> "command_input",
         ErrorEvent	=> "command_error");
    $control->{wheel}->put("200 Welcome to ReplaySocket $VERSION.");
}

my %command_dispatch =
    (QUIT => \&quit,
     DATE => \&date,
     LIST => \&list,
     HELP => \&help,
     "?"  => \&help);

sub leave {
    if ($output_fh) {
        my $fh = select($output_fh);
        $| = 1;
        print stamp(), " Finished\n";
        select($fh);
    }
    exit;
}

sub quit {
    my $wheel = shift->{wheel};
    $wheel->put("205 .");
    leave();
}

sub date {
    my $wheel = shift->{wheel};
    $wheel->put(cur_time());
}

sub list {
    my $wheel = shift->{wheel};
    $_->print($wheel, "") for values %listeners;
}

sub help {
    my $wheel = shift->{wheel};
    $wheel->put("100 Legal commands");
    $wheel->put("  $_") for sort keys %command_dispatch;
    $wheel->put(".");
}

sub command_input {
    my ($control, $input) = @_[HEAP, ARG0];
    my ($command, @args) = split " ", $input or return;
    my $c = uc($command);
    my @candidates = grep /^\Q$c/, keys %command_dispatch;
    if (@candidates == 1) {
        $command_dispatch{$candidates[0]}->($control, $command, @args);
    } elsif (@candidates) {
        $control->{wheel}->put("501 Ambiguous command $command: @candidates");
    } else {
        $control->{wheel}->put("500 Unknown command $command");
    }
}

sub command_error {
    my ($control, $operation, $errno, $message) = @_[HEAP, ARG0..ARG2];
    print STDERR "Control connection: $operation: $message\n" if
        $errno && !$quiet;
    leave() if $control->{must};
    delete $control->{wheel};
}

POE::Session->create
    (inline_states => {
        _start		=> \&start_listen,
        accepted	=> \&accepted,
        listener_failed => \&listener_failed,
        _default	=> \&default,
    },
     heap => Listener->new,
     args => [$_]) for @ARGV;

POE::Session->create
    (inline_states => {
        _start		=> \&start_command,
        _default	=> \&default,
        command_input	=> \&command_input,
        command_error	=> \&command_error,
    },
     heap => bless({}, "Command"),
     args => [\*STDIN, \*STDOUT, 1]);


$poe_kernel->run();
__END__

=head1 NAME

ReplayProxy - Log and replay TCP/IP connections

=head1 SYNOPSIS

ReplayProxy [-i input_file] [-o output_file] {from/to | to}

 from, to := host | :port | host:port

 host defaults to localhost
 port defaults to 0

=head1 BUGS

Replay is not implemented, only logging

=head1 SEE ALSO

L<rport(1)>

=head1 AUTHOR

Ton Hospel, E<lt>ReplayProxy@ton.iguana.beE<gt>

=head1 COPYRIGHT AND LICENSE

Copyright (C) 2005 by Ton Hospel

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself, either Perl version 5.6 or,
at your option, any later version of Perl 5 you may have available.

=cut
