Skip to content

Commit 45b5370

Browse files
committedSep 26, 2011
Allow Passive Analytics to scale to analyzing millions of log entries without nuking MySQL in the process. More thorough test cleanup.
1 parent ec00e86 commit 45b5370

File tree

3 files changed

+99
-64
lines changed

3 files changed

+99
-64
lines changed
 

‎lib/WebGUI/Workflow/Activity/BucketPassiveAnalytics.pm

+49-33
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,20 @@ sub definition {
5151
return $class->SUPER::definition($session,$definition);
5252
}
5353

54+
#-------------------------------------------------------------------
55+
56+
=head2 get_statement( session, counter )
57+
58+
Return a statement handle at the desired offset.
59+
60+
=cut
61+
62+
sub get_statement {
63+
my ($session, $logIndex) = @_;
64+
my $deltaSql = q{select SQL_CALC_FOUND_ROWS userId, assetId, url, delta, from_unixtime(timeStamp) as stamp from deltaLog order by timestamp limit ?, 500000};
65+
my $sth = $session->db->read($deltaSql, [$logIndex+0]);
66+
return $sth;
67+
}
5468

5569
#-------------------------------------------------------------------
5670

@@ -85,47 +99,49 @@ sub execute {
8599
my %bucketCache = ();
86100

87101
##Configure all the SQL
88-
my $deltaSql = <<"EOSQL1";
89-
select userId, assetId, url, delta, from_unixtime(timeStamp) as stamp
90-
from deltaLog order by timestamp limit $logIndex, 1234567890
91-
EOSQL1
92-
my $deltaSth = $session->db->read($deltaSql);
93-
my $bucketSth = $session->db->prepare('insert into bucketLog (userId, Bucket, duration, timeStamp) VALUES (?,?,?,?)');
102+
my $deltaSth = get_statement($session, $logIndex);
103+
my $total_rows = $session->db->quickScalar('select found_rows()');
104+
105+
my $bucketSth = $session->db->prepare('insert into bucketLog (userId, Bucket, duration, timeStamp) VALUES (?,?,?,?)');
94106

95107
##Walk through the log file entries, one by one. Run each entry against
96108
##all the rules until 1 matches. If it doesn't match any rule, then bin it
97109
##into the "Other" bucket.
98-
DELTA_ENTRY: while (my $entry = $deltaSth->hashRef()) {
99-
++$logIndex;
100-
my $bucketFound = 0;
101-
my $url = $entry->{url};
102-
if (exists $bucketCache{$url}) {
103-
$bucketSth->execute([$entry->{userId}, $bucketCache{$url}, $entry->{delta}, $entry->{stamp}]);
104-
}
105-
else {
106-
RULE: foreach my $rule (@rules) {
107-
next RULE unless $url =~ $rule->[1];
108-
109-
# Into the bucket she goes..
110-
$bucketCache{$url} = $rule->[0];
111-
$bucketSth->execute([$entry->{userId}, $rule->[0], $entry->{delta}, $entry->{stamp}]);
112-
$bucketFound = 1;
113-
last RULE;
110+
DELTA_CHUNK: while (1) {
111+
DELTA_ENTRY: while (my $entry = $deltaSth->hashRef()) {
112+
++$logIndex;
113+
my $bucketFound = 0;
114+
my $url = $entry->{url};
115+
if (exists $bucketCache{$url}) {
116+
$bucketSth->execute([$entry->{userId}, $bucketCache{$url}, $entry->{delta}, $entry->{stamp}]);
114117
}
115-
if (!$bucketFound) {
116-
$bucketCache{$url} = 'Other';
117-
$bucketSth->execute([$entry->{userId}, 'Other', $entry->{delta}, $entry->{stamp}]);
118+
else {
119+
RULE: foreach my $rule (@rules) {
120+
next RULE unless $url =~ $rule->[1];
121+
122+
# Into the bucket she goes..
123+
$bucketCache{$url} = $rule->[0];
124+
$bucketSth->execute([$entry->{userId}, $rule->[0], $entry->{delta}, $entry->{stamp}]);
125+
$bucketFound = 1;
126+
last RULE;
127+
}
128+
if (!$bucketFound) {
129+
$bucketCache{$url} = 'Other';
130+
$bucketSth->execute([$entry->{userId}, 'Other', $entry->{delta}, $entry->{stamp}]);
131+
}
132+
}
133+
if (time() > $endTime) {
134+
$expired = 1;
135+
last DELTA_ENTRY;
118136
}
119137
}
120-
if (time() > $endTime) {
121-
$expired = 1;
122-
last DELTA_ENTRY;
123-
}
124-
}
125138

126-
if ($expired) {
127-
$instance->setScratch('logIndex', $logIndex);
128-
return $self->WAITING(1);
139+
if ($expired) {
140+
$instance->setScratch('logIndex', $logIndex);
141+
return $self->WAITING(1);
142+
}
143+
last DELTA_CHUNK if $logIndex >= $total_rows;
144+
$deltaSth = get_statement($session, $logIndex);
129145
}
130146
my $message = 'Passive analytics is done.';
131147
if ($session->setting->get('passiveAnalyticsDeleteDelta')) {

‎lib/WebGUI/Workflow/Activity/SummarizePassiveAnalytics.pm

+48-31
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,20 @@ sub definition {
4949
return $class->SUPER::definition($session,$definition);
5050
}
5151

52+
#-------------------------------------------------------------------
53+
54+
=head2 get_statement( session, counter )
55+
56+
Return a statement handle at the desired offset.
57+
58+
=cut
59+
60+
sub get_statement {
61+
my ($session, $counter) = @_;
62+
my $passive = q{select SQL_CALC_FOUND_ROWS * from passiveLog where userId <> '1' order by userId, sessionId, timeStamp limit ?, 500000};
63+
my $sth = $session->db->read($passive, [$counter+0]);
64+
return $sth;
65+
}
5266

5367
#-------------------------------------------------------------------
5468

@@ -72,64 +86,67 @@ sub execute {
7286
my $endTime = time() + $self->getTTL;
7387
my $deltaInterval = $self->get('deltaInterval');
7488

75-
my $passive = q{select * from passiveLog where userId <> '1' order by userId, sessionId, timeStamp};
76-
my $sth;
7789
my $lastUserId;
7890
my $lastSessionId;
7991
my $lastTimeStamp;
8092
my $lastAssetId;
8193
my $lastUrl;
8294
my $counter = $instance->getScratch('counter');
95+
my $sth = get_statement($session, $counter);
8396
if ($counter) {
84-
$passive .= ' limit '. $counter .', 1234567890';
85-
$sth = $session->db->read($passive);
8697
$lastUserId = $instance->getScratch('lastUserId');
8798
$lastSessionId = $instance->getScratch('lastSessionId');
8899
$lastTimeStamp = $instance->getScratch('lastTimeStamp');
89100
$lastAssetId = $instance->getScratch('lastAssetId');
90101
$lastUrl = $instance->getScratch('lastUrl');
91102
}
92103
else {
93-
$sth = $session->db->read($passive);
94104
my $logLine = $sth->hashRef();
95105
$lastUserId = $logLine->{userId};
96106
$lastSessionId = $logLine->{sessionId};
97107
$lastTimeStamp = $logLine->{timeStamp};
98108
$lastAssetId = $logLine->{assetId};
99109
$lastUrl = $logLine->{url};
110+
$session->db->write('delete from deltaLog'); ##Only if we're starting out
100111
}
101112

102-
$session->db->write('delete from deltaLog'); ##Only if we're starting out
113+
my $total_rows = $session->db->quickScalar('select found_rows()');
114+
103115
my $deltaLog = $session->db->prepare('insert into deltaLog (userId, assetId, delta, timeStamp, url) VALUES (?,?,?,?,?)');
104116

105117
my $expired = 0;
106-
LOG_ENTRY: while (my $logLine = $sth->hashRef()) {
107-
$counter++;
108-
my $delta = $logLine->{timeStamp} - $lastTimeStamp;
109-
if ( $logLine->{userId} eq $lastUserId
110-
&& $logLine->{sessionId} eq $lastSessionId
111-
&& $delta < $deltaInterval ) {
112-
$deltaLog->execute([$lastUserId, $lastAssetId, $delta, $lastTimeStamp, $lastUrl]);
118+
LOG_CHUNK: while (1) {
119+
LOG_ENTRY: while (my $logLine = $sth->hashRef()) {
120+
$counter++;
121+
my $delta = $logLine->{timeStamp} - $lastTimeStamp;
122+
if ( $logLine->{userId} eq $lastUserId
123+
&& $logLine->{sessionId} eq $lastSessionId
124+
&& $delta < $deltaInterval ) {
125+
$deltaLog->execute([$lastUserId, $lastAssetId, $delta, $lastTimeStamp, $lastUrl]);
126+
}
127+
$lastUserId = $logLine->{userId};
128+
$lastSessionId = $logLine->{sessionId};
129+
$lastTimeStamp = $logLine->{timeStamp};
130+
$lastAssetId = $logLine->{assetId};
131+
$lastUrl = $logLine->{url};
132+
if (time() > $endTime) {
133+
$instance->setScratch('lastUserId', $lastUserId);
134+
$instance->setScratch('lastSessionId', $lastSessionId);
135+
$instance->setScratch('lastTimeStamp', $lastTimeStamp);
136+
$instance->setScratch('lastAssetId', $lastAssetId);
137+
$instance->setScratch('lastUrl', $lastUrl);
138+
$instance->setScratch('counter', $counter);
139+
$expired = 1;
140+
last LOG_ENTRY;
141+
}
113142
}
114-
$lastUserId = $logLine->{userId};
115-
$lastSessionId = $logLine->{sessionId};
116-
$lastTimeStamp = $logLine->{timeStamp};
117-
$lastAssetId = $logLine->{assetId};
118-
$lastUrl = $logLine->{url};
119-
if (time() > $endTime) {
120-
$instance->setScratch('lastUserId', $lastUserId);
121-
$instance->setScratch('lastSessionId', $lastSessionId);
122-
$instance->setScratch('lastTimeStamp', $lastTimeStamp);
123-
$instance->setScratch('lastAssetId', $lastAssetId);
124-
$instance->setScratch('lastUrl', $lastUrl);
125-
$instance->setScratch('counter', $counter);
126-
$expired = 1;
127-
last LOG_ENTRY;
143+
144+
$sth->finish;
145+
if ($expired) {
146+
return $self->WAITING(1);
128147
}
129-
}
130-
131-
if ($expired) {
132-
return $self->WAITING(1);
148+
last LOG_CHUNK if $counter >= $total_rows;
149+
$sth = get_statement($session, $counter);
133150
}
134151

135152
$instance->deleteScratch('lastUserId');

‎t/Workflow/Activity/BucketPassiveAnalytics.t

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ my $session = WebGUI::Test->session;
1818
$session->user({userId => 3});
1919

2020
WebGUI::Test->addToCleanup(SQL => 'delete from passiveLog');
21+
WebGUI::Test->addToCleanup(SQL => 'delete from deltaLog');
22+
WebGUI::Test->addToCleanup(SQL => 'delete from bucketLog');
2123
WebGUI::Test->addToCleanup(SQL => 'delete from analyticRule');
2224

2325
my $workflow = WebGUI::Workflow->new($session, 'PassiveAnalytics000001');

0 commit comments

Comments
 (0)
Please sign in to comment.