Skip to content

Commit

Permalink
Rework PA again to not try and sort the passiveLog. Instead, we itera…
Browse files Browse the repository at this point in the history
…te and store "last" data in a new database table.
  • Loading branch information
perlDreamer committed Oct 6, 2011
1 parent 5314530 commit 6ed275b
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 46 deletions.
20 changes: 20 additions & 0 deletions docs/upgrades/upgrade_7.10.23-7.10.24.pl
Expand Up @@ -31,10 +31,30 @@ BEGIN
my $session = start(); # this line required

# upgrade functions go here
addPALastLogTable($session);

finish($session); # this line required


#----------------------------------------------------------------------------
# Describe what our function does
sub addPALastLogTable {
my $session = shift;
print "\tAdd a table to keep track of additional Passive Analytics data... " unless $quiet;
# and here's our code
$session->db->write(<<EOSQL);
CREATE TABLE `PA_lastLog` (
`userId` char(22) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`assetId` char(22) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`sessionId` char(22) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,
`timeStamp` bigint(20) DEFAULT NULL,
`url` char(255) NOT NULL,
PRIMARY KEY (userId, sessionId)
) ENGINE=MyISAM DEFAULT CHARSET=utf8
EOSQL
print "DONE!\n" unless $quiet;
}

#----------------------------------------------------------------------------
# Describe what our function does
#sub exampleFunction {
Expand Down
2 changes: 1 addition & 1 deletion lib/WebGUI/Workflow/Activity/BucketPassiveAnalytics.pm
Expand Up @@ -61,7 +61,7 @@ Return a statement handle at the desired offset.

sub get_statement {
my ($session, $logIndex) = @_;
my $deltaSql = q{select SQL_CALC_FOUND_ROWS userId, assetId, url, delta, from_unixtime(timeStamp) as stamp from deltaLog order by timestamp limit ?, 500000};
my $deltaSql = q{select SQL_CALC_FOUND_ROWS userId, assetId, url, delta, from_unixtime(timeStamp) as stamp from deltaLog limit ?, 500000};
my $sth = $session->db->read($deltaSql, [$logIndex+0]);
return $sth;
}
Expand Down
61 changes: 22 additions & 39 deletions lib/WebGUI/Workflow/Activity/SummarizePassiveAnalytics.pm
Expand Up @@ -59,7 +59,7 @@ Return a statement handle at the desired offset.

sub get_statement {
my ($session, $counter) = @_;
my $passive = q{select SQL_CALC_FOUND_ROWS * from passiveLog where userId <> '1' order by userId, sessionId, timeStamp limit ?, 500000};
my $passive = q{select SQL_CALC_FOUND_ROWS * from passiveLog where userId <> '1' limit ?, 500000};
my $sth = $session->db->read($passive, [$counter+0]);
return $sth;
}
Expand Down Expand Up @@ -93,68 +93,51 @@ sub execute {
my $lastUrl;
my $counter = $instance->getScratch('counter');
my $sth = get_statement($session, $counter);
if ($counter) {
$lastUserId = $instance->getScratch('lastUserId');
$lastSessionId = $instance->getScratch('lastSessionId');
$lastTimeStamp = $instance->getScratch('lastTimeStamp');
$lastAssetId = $instance->getScratch('lastAssetId');
$lastUrl = $instance->getScratch('lastUrl');
}
else {
my $logLine = $sth->hashRef();
$lastUserId = $logLine->{userId};
$lastSessionId = $logLine->{sessionId};
$lastTimeStamp = $logLine->{timeStamp};
$lastAssetId = $logLine->{assetId};
$lastUrl = $logLine->{url};
$session->db->write('delete from deltaLog'); ##Only if we're starting out
if (! $counter) { #Clean up from last time, just in case
$session->db->write('delete from deltaLog');
$session->db->write('delete from PA_lastLog');
}

my $total_rows = $session->db->quickScalar('select found_rows()');

my $deltaLog = $session->db->prepare('insert into deltaLog (userId, assetId, delta, timeStamp, url) VALUES (?,?,?,?,?)');
my $deltaLog = $session->db->prepare('insert into deltaLog (userId, assetId, timeStamp, url, delta) VALUES (?,?,?,?,?)');
my $recordLast = $session->db->prepare('REPLACE INTO PA_lastLog (userId, sessionId, timeStamp, url) VALUES (?,?,?,?)');
my $fetchLast = $session->db->prepare('select * from PA_lastLog where sessionId=? and userId=?');

my $expired = 0;
LOG_CHUNK: while (1) {
LOG_ENTRY: while (my $logLine = $sth->hashRef()) {
$counter++;
my $delta = $logLine->{timeStamp} - $lastTimeStamp;
if ( $logLine->{userId} eq $lastUserId
&& $logLine->{sessionId} eq $lastSessionId
&& $delta < $deltaInterval ) {
$deltaLog->execute([$lastUserId, $lastAssetId, $delta, $lastTimeStamp, $lastUrl]);
$fetchLast->execute([@{$logLine}{qw/sessionId userId/}]);
my $lastLine = $fetchLast->hashRef();
$recordLast->execute([ (@{ $logLine }{qw/userId sessionId timeStamp url/}) ]);
if ($lastLine->{timeStamp}) {
my $delta = $logLine->{timeStamp} - $lastLine->{timeStamp};
$deltaLog->execute([ (@{ $lastLine }{qw/userId assetId timeStamp url/}), $delta]);
}
$lastUserId = $logLine->{userId};
$lastSessionId = $logLine->{sessionId};
$lastTimeStamp = $logLine->{timeStamp};
$lastAssetId = $logLine->{assetId};
$lastUrl = $logLine->{url};
if (time() > $endTime) {
$instance->setScratch('lastUserId', $lastUserId);
$instance->setScratch('lastSessionId', $lastSessionId);
$instance->setScratch('lastTimeStamp', $lastTimeStamp);
$instance->setScratch('lastAssetId', $lastAssetId);
$instance->setScratch('lastUrl', $lastUrl);
$instance->setScratch('counter', $counter);
$expired = 1;
last LOG_ENTRY;
}
}

$sth->finish;
if ($expired) {
$deltaLog->finish;
$recordLast->finish;
$fetchLast->finish;
$sth->finish;
return $self->WAITING(1);
}
last LOG_CHUNK if $counter >= $total_rows;
$sth = get_statement($session, $counter);
}

$instance->deleteScratch('lastUserId');
$instance->deleteScratch('lastSessionId');
$instance->deleteScratch('lastTimeStamp');
$instance->deleteScratch('lastAssetId');
$instance->deleteScratch('lastUrl');
$instance->deleteScratch('counter');
$deltaLog->finish;
$recordLast->finish;
$fetchLast->finish;
$sth->finish;
$session->db->write('delete from PA_lastLog');
return $self->COMPLETE;
}

Expand Down
54 changes: 48 additions & 6 deletions t/Workflow/Activity/BucketPassiveAnalytics.t
Expand Up @@ -5,14 +5,13 @@ use lib "$FindBin::Bin/../../lib";
#use DB;

use WebGUI::Test;
use WebGUI::Asset;
use WebGUI::PassiveAnalytics::Rule;
use WebGUI::Workflow::Activity::BucketPassiveAnalytics;
use WebGUI::Text;

use Test::More;
use Test::Deep;
use Data::Dumper;

plan tests => 1; # increment this value for each test you create
plan tests => 2; # increment this value for each test you create

my $session = WebGUI::Test->session;
$session->user({userId => 3});
Expand All @@ -21,6 +20,7 @@ WebGUI::Test->addToCleanup(SQL => 'delete from passiveLog');
WebGUI::Test->addToCleanup(SQL => 'delete from deltaLog');
WebGUI::Test->addToCleanup(SQL => 'delete from bucketLog');
WebGUI::Test->addToCleanup(SQL => 'delete from analyticRule');
WebGUI::Test->addToCleanup(SQL => 'delete from PA_lastLog');

my $workflow = WebGUI::Workflow->new($session, 'PassiveAnalytics000001');
my $activities = $workflow->getActivities();
Expand Down Expand Up @@ -67,7 +67,8 @@ while (my $spec = shift @url2) {
}

my @urls = map {$_->[1]} @ruleSets;
loadLogData($session, @urls);
#loadLogData($session, @urls);
repeatableLogData($session, 'passiveAnalyticsLog');

##Build rulesets

Expand All @@ -80,7 +81,28 @@ PAUSE: while (my $retval = $instance->run()) {
}
#DB::disable_profile();

ok(1, 'One test');
cmp_ok $counter, '<', 16, 'Successful completion of PA';

my $get_line = $session->db->read('select userId, Bucket, duration from bucketLog');

my @database_dump = ();
ROW: while ( 1 ) {
my @datum = $get_line->array();
last ROW unless @datum;
push @database_dump, [ @datum ];
}

cmp_bag(
[ @database_dump ],
[
['user1', 'one', 10],
['user1', 'two', 15],
['user2', 'zero', 2],
['user2', 'uno', 3],
['user2', 'Other', 5],
],
'PA analysis completed, and calculated correctly'
) or diag Dumper(\@database_dump);

sub loadLogData {
my ($session, @urls) = @_;
Expand All @@ -100,4 +122,24 @@ sub loadLogData {
}
}

sub repeatableLogData {
my ($session, $dataLogName) = @_;
$session->db->write('delete from passiveLog');
my $insert = $session->db->prepare(
q!insert into passiveLog (userId, sessionId, timeStamp, url, assetId) VALUES (?,?,?,?,'assetId')!
);
my $data_name = WebGUI::Test::collateral('passiveAnalyticsLog');
open my $log_data, '<', $data_name or
die "Unable to open $data_name for reading: $!";
local $_;
while (<$log_data>) {
next if /^\s*#/;
s/#\.*$//;
chomp;
my @data = split;
$insert->execute([@data]);
}
$insert->finish;
}

#vim:ft=perl
8 changes: 8 additions & 0 deletions t/supporting_collateral/passiveAnalyticsLog
@@ -0,0 +1,8 @@
#user session timestamp url
user1 session11 100 /one
user1 session11 110 /two
user1 session11 125 /three
user2 session21 200 /yelnats
user2 session21 202 /one/uno
user2 session21 205 /whatever
user2 session21 210 /something_else

0 comments on commit 6ed275b

Please sign in to comment.