Commit ed00d67c authored by sam marshall's avatar sam marshall
Browse files

MDL-60174 core_dml: get_recordset on Postgres eats all the RAM

On Postgres, get_recordset_sql loads all the results into memory
(within the Postgres library, which doesn't count towards the PHP
memory limit, but does count towards making your server run out of
memory) as soon as the query completes.

This commit changes the code to use cursors, which in Postgres
allow the results to be returned in smaller chunks (by default
100,000 rows).
parent 5bde2c2b
......@@ -45,6 +45,12 @@ class pgsql_native_moodle_database extends moodle_database {
/** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
protected $savepointpresent = false;
/** @var int Number of cursors used (for constructing a unique ID) */
protected $cursorcount = 0;
/** @var int Default number of rows to fetch at a time when using recordsets with cursors */
const DEFAULT_FETCH_BUFFER_SIZE = 100000;
/**
* Detects if all needed PHP stuff installed.
* Note: can be used before connect()
......@@ -734,14 +740,89 @@ class pgsql_native_moodle_database extends moodle_database {
list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
$this->query_start($sql, $params, SQL_QUERY_SELECT);
$result = pg_query_params($this->pgsql, $sql, $params);
// For any query that doesn't explicitly specify a limit, we must use cursors to stop it
// loading the entire thing (unless the config setting is turned off).
$usecursors = !$limitnum && ($this->get_fetch_buffer_size() > 0);
if ($usecursors) {
// Work out the cursor unique identifer. This is based on a simple count used which
// should be OK because the identifiers only need to be unique within the current
// transaction.
$this->cursorcount++;
$cursorname = 'crs' . $this->cursorcount;
// Do the query to a cursor.
$sql = 'DECLARE ' . $cursorname . ' NO SCROLL CURSOR WITH HOLD FOR ' . $sql;
$result = pg_query_params($this->pgsql, $sql, $params);
} else {
$result = pg_query_params($this->pgsql, $sql, $params);
$cursorname = '';
}
$this->query_end($result);
if ($usecursors) {
pg_free_result($result);
$result = null;
}
return $this->create_recordset($result);
return new pgsql_native_moodle_recordset($result, $this, $cursorname);
}
protected function create_recordset($result) {
return new pgsql_native_moodle_recordset($result);
/**
* Gets size of fetch buffer used for recordset queries.
*
* If this returns 0 then cursors will not be used, meaning recordset queries will occupy enough
* memory as needed for the Postgres library to hold the entire query results in memory.
*
* @return int Fetch buffer size or 0 indicating not to use cursors
*/
protected function get_fetch_buffer_size() {
if (array_key_exists('fetchbuffersize', $this->dboptions)) {
return (int)$this->dboptions['fetchbuffersize'];
} else {
return self::DEFAULT_FETCH_BUFFER_SIZE;
}
}
/**
* Retrieves data from cursor. For use by recordset only; do not call directly.
*
* Return value contains the next batch of Postgres data, and a boolean indicating if this is
* definitely the last batch (if false, there may be more)
*
* @param string $cursorname Name of cursor to read from
* @return array Array with 2 elements (next data batch and boolean indicating last batch)
*/
public function fetch_from_cursor($cursorname) {
$count = $this->get_fetch_buffer_size();
$sql = 'FETCH ' . $count . ' FROM ' . $cursorname;
$this->query_start($sql, [], SQL_QUERY_AUX);
$result = pg_query($this->pgsql, $sql);
$last = pg_num_rows($result) !== $count;
$this->query_end($result);
return [$result, $last];
}
/**
* Closes a cursor. For use by recordset only; do not call directly.
*
* @param string $cursorname Name of cursor to close
* @return bool True if we actually closed one, false if the transaction was cancelled
*/
public function close_cursor($cursorname) {
// If the transaction got cancelled, then ignore this request.
$sql = 'CLOSE ' . $cursorname;
$this->query_start($sql, [], SQL_QUERY_AUX);
$result = pg_query($this->pgsql, $sql);
$this->query_end($result);
if ($result) {
pg_free_result($result);
}
return true;
}
/**
......@@ -1366,7 +1447,7 @@ class pgsql_native_moodle_database extends moodle_database {
protected function begin_transaction() {
$this->savepointpresent = true;
$sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
$this->query_start($sql, NULL, SQL_QUERY_AUX);
$this->query_start($sql, null, SQL_QUERY_AUX);
$result = pg_query($this->pgsql, $sql);
$this->query_end($result);
......@@ -1381,7 +1462,7 @@ class pgsql_native_moodle_database extends moodle_database {
protected function commit_transaction() {
$this->savepointpresent = false;
$sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
$this->query_start($sql, NULL, SQL_QUERY_AUX);
$this->query_start($sql, null, SQL_QUERY_AUX);
$result = pg_query($this->pgsql, $sql);
$this->query_end($result);
......@@ -1396,7 +1477,7 @@ class pgsql_native_moodle_database extends moodle_database {
protected function rollback_transaction() {
$this->savepointpresent = false;
$sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
$this->query_start($sql, NULL, SQL_QUERY_AUX);
$this->query_start($sql, null, SQL_QUERY_AUX);
$result = pg_query($this->pgsql, $sql);
$this->query_end($result);
......
......@@ -40,26 +40,64 @@ class pgsql_native_moodle_recordset extends moodle_recordset {
protected $current;
protected $blobs = array();
/** @var string Name of cursor or '' if none */
protected $cursorname;
/** @var pgsql_native_moodle_database Postgres database resource */
protected $db;
/** @var bool True if there are no more rows to fetch from the cursor */
protected $lastbatch;
/**
* Build a new recordset to iterate over.
*
* @param resource $result A pg_query() result object to create a recordset from.
* When using cursors, $result will be null initially.
*
* @param resource|null $result A pg_query() result object to create a recordset from.
* @param pgsql_native_moodle_database $db Database object (only required when using cursors)
* @param string $cursorname Name of cursor or '' if none
*/
public function __construct($result) {
public function __construct($result, pgsql_native_moodle_database $db = null, $cursorname = '') {
if ($cursorname && !$db) {
throw new coding_exception('When specifying a cursor, $db is required');
}
$this->result = $result;
$this->db = $db;
$this->cursorname = $cursorname;
// When there is a cursor, do the initial fetch.
if ($cursorname) {
$this->fetch_cursor_block();
}
// Find out if there are any blobs.
$numfields = pg_num_fields($result);
$numfields = pg_num_fields($this->result);
for ($i = 0; $i < $numfields; $i++) {
$type = pg_field_type($result, $i);
$type = pg_field_type($this->result, $i);
if ($type == 'bytea') {
$this->blobs[] = pg_field_name($result, $i);
$this->blobs[] = pg_field_name($this->result, $i);
}
}
$this->current = $this->fetch_next();
}
/**
* Fetches the next block of data when using cursors.
*
* @throws coding_exception If you call this when the fetch buffer wasn't freed yet
*/
protected function fetch_cursor_block() {
if ($this->result) {
throw new coding_exception('Unexpected non-empty result when fetching from cursor');
}
list($this->result, $this->lastbatch) = $this->db->fetch_from_cursor($this->cursorname);
if (!$this->result) {
throw new coding_exception('Unexpected failure when fetching from cursor');
}
}
public function __destruct() {
$this->close();
}
......@@ -69,9 +107,21 @@ class pgsql_native_moodle_recordset extends moodle_recordset {
return false;
}
if (!$row = pg_fetch_assoc($this->result)) {
// There are no more rows in this result.
pg_free_result($this->result);
$this->result = null;
return false;
// If using a cursor, can we fetch the next block?
if ($this->cursorname && !$this->lastbatch) {
$this->fetch_cursor_block();
if (!$row = pg_fetch_assoc($this->result)) {
pg_free_result($this->result);
$this->result = null;
return false;
}
} else {
return false;
}
}
if ($this->blobs) {
......@@ -111,5 +161,11 @@ class pgsql_native_moodle_recordset extends moodle_recordset {
}
$this->current = null;
$this->blobs = null;
// If using cursors, close the cursor.
if ($this->cursorname) {
$this->db->close_cursor($this->cursorname);
$this->cursorname = null;
}
}
}
<?php
// This file is part of Moodle - http://moodle.org/
//
// Moodle is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Moodle is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Moodle. If not, see <http://www.gnu.org/licenses/>.
/**
* Test specific features of the Postgres dml support relating to recordsets.
*
* @package core
* @category test
* @copyright 2017 The Open University
* @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
*/
defined('MOODLE_INTERNAL') || die();
global $CFG;
require_once($CFG->dirroot.'/lib/dml/pgsql_native_moodle_database.php');
/**
* Test specific features of the Postgres dml support relating to recordsets.
*
* @package core
* @category test
* @copyright 2017 The Open University
* @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
*/
class pgsql_native_recordset_testcase extends basic_testcase {
/** @var pgsql_native_moodle_database Special database connection */
protected $specialdb;
/**
* Creates a second db connection and a temp table with values in for testing.
*/
protected function setUp() {
global $DB;
parent::setUp();
// Skip tests if not using Postgres.
if (!($DB instanceof pgsql_native_moodle_database)) {
$this->markTestSkipped('Postgres-only test');
}
}
/**
* Initialises database connection with given fetch buffer size
* @param int $fetchbuffersize Size of fetch buffer
*/
protected function init_db($fetchbuffersize) {
global $CFG, $DB;
// To make testing easier, create a database with the same dboptions as the real one,
// but a low number for the cursor size.
$this->specialdb = \moodle_database::get_driver_instance('pgsql', 'native', true);
$dboptions = ['fetchbuffersize' => $fetchbuffersize];
$this->specialdb->connect($CFG->dbhost, $CFG->dbuser, $CFG->dbpass, $CFG->dbname,
$DB->get_prefix(), $dboptions);
// Create a temp table.
$dbman = $this->specialdb->get_manager();
$table = new xmldb_table('silly_test_table');
$table->add_field('id', XMLDB_TYPE_INTEGER, 10, null, XMLDB_NOTNULL, XMLDB_SEQUENCE);
$table->add_field('msg', XMLDB_TYPE_CHAR, 255);
$table->add_key('primary', XMLDB_KEY_PRIMARY, ['id']);
$dbman->create_temp_table($table);
// Add some records to the table.
for ($index = 1; $index <= 7; $index++) {
$this->specialdb->insert_record('silly_test_table', ['msg' => 'record' . $index]);
}
}
/**
* Gets rid of the second db connection.
*/
protected function tearDown() {
if ($this->specialdb) {
$table = new xmldb_table('silly_test_table');
$this->specialdb->get_manager()->drop_table($table);
$this->specialdb->dispose();
$this->specialdb = null;
}
parent::tearDown();
}
/**
* Tests that get_recordset_sql works when using cursors, which it does when no limit is
* specified.
*/
public function test_recordset_cursors() {
$this->init_db(3);
// Query the table and check the actual queries using debug mode, also check the count.
$this->specialdb->set_debug(true);
$before = $this->specialdb->perf_get_queries();
ob_start();
$rs = $this->specialdb->get_recordset_sql('SELECT * FROM {silly_test_table} ORDER BY id');
$index = 0;
foreach ($rs as $rec) {
$index++;
$this->assertEquals('record' . $index, $rec->msg);
}
$this->assertEquals(7, $index);
$rs->close();
$debugging = ob_get_contents();
ob_end_clean();
// Expect 4 fetches - first three, next three, last one (with 2).
$this->assert_query_regexps([
'~SELECT \* FROM~',
'~FETCH 3 FROM crs1~',
'~FETCH 3 FROM crs1~',
'~FETCH 3 FROM crs1~',
'~CLOSE crs1~'], $debugging);
// There should have been 7 queries tracked for perf log.
$this->assertEquals(5, $this->specialdb->perf_get_queries() - $before);
// Try a second time - this time we'll request exactly 3 items so that it has to query
// twice (as it can't tell if the first batch is the last).
$before = $this->specialdb->perf_get_queries();
ob_start();
$rs = $this->specialdb->get_recordset_sql(
'SELECT * FROM {silly_test_table} WHERE id <= ? ORDER BY id', [3]);
$index = 0;
foreach ($rs as $rec) {
$index++;
$this->assertEquals('record' . $index, $rec->msg);
}
$this->assertEquals(3, $index);
$rs->close();
$debugging = ob_get_contents();
ob_end_clean();
$this->specialdb->set_debug(false);
// Expect 2 fetches - first three, then next one (empty).
$this->assert_query_regexps([
'~SELECT \* FROM~',
'~FETCH 3 FROM crs2~',
'~FETCH 3 FROM crs2~',
'~CLOSE crs2~'], $debugging);
// There should have been 4 queries tracked for perf log.
$this->assertEquals(4, $this->specialdb->perf_get_queries() - $before);
}
/**
* Tests that get_recordset_sql works when using cursors and when there are two overlapping
* recordsets being used.
*/
public function test_recordset_cursors_overlapping() {
$this->init_db(3);
$rs1 = $this->specialdb->get_recordset('silly_test_table', null, 'id');
$rs2 = $this->specialdb->get_recordset('silly_test_table', null, 'id DESC');
// Read first 3 from first recordset.
$read = [];
$read[] = $rs1->current()->id;
$rs1->next();
$read[] = $rs1->current()->id;
$rs1->next();
$read[] = $rs1->current()->id;
$rs1->next();
$this->assertEquals([1, 2, 3], $read);
// Read 5 from second recordset.
$read = [];
$read[] = $rs2->current()->id;
$rs2->next();
$read[] = $rs2->current()->id;
$rs2->next();
$read[] = $rs2->current()->id;
$rs2->next();
$read[] = $rs2->current()->id;
$rs2->next();
$read[] = $rs2->current()->id;
$rs2->next();
$this->assertEquals([7, 6, 5, 4, 3], $read);
// Now read remainder of first recordset and close it.
$read = [];
$read[] = $rs1->current()->id;
$rs1->next();
$read[] = $rs1->current()->id;
$rs1->next();
$read[] = $rs1->current()->id;
$rs1->next();
$read[] = $rs1->current()->id;
$rs1->next();
$this->assertFalse($rs1->valid());
$this->assertEquals([4, 5, 6, 7], $read);
$rs1->close();
// And remainder of second.
$read = [];
$read[] = $rs2->current()->id;
$rs2->next();
$read[] = $rs2->current()->id;
$rs2->next();
$this->assertFalse($rs2->valid());
$this->assertEquals([2, 1], $read);
$rs2->close();
}
/**
* Tests that get_recordset_sql works when using cursors and transactions inside.
*/
public function test_recordset_cursors_transaction_inside() {
$this->init_db(3);
// Transaction inside the recordset processing.
$rs = $this->specialdb->get_recordset('silly_test_table', null, 'id');
$read = [];
foreach ($rs as $rec) {
$read[] = $rec->id;
$transaction = $this->specialdb->start_delegated_transaction();
$transaction->allow_commit();
}
$this->assertEquals([1, 2, 3, 4, 5, 6, 7], $read);
$rs->close();
}
/**
* Tests that get_recordset_sql works when using cursors and a transaction outside.
*/
public function test_recordset_cursors_transaction_outside() {
$this->init_db(3);
// Transaction outside the recordset processing.
$transaction = $this->specialdb->start_delegated_transaction();
$rs = $this->specialdb->get_recordset('silly_test_table', null, 'id');
$read = [];
foreach ($rs as $rec) {
$read[] = $rec->id;
}
$this->assertEquals([1, 2, 3, 4, 5, 6, 7], $read);
$rs->close();
$transaction->allow_commit();
}
/**
* Tests that get_recordset_sql works when using cursors and a transaction overlapping.
*/
public function test_recordset_cursors_transaction_overlapping_before() {
$this->init_db(3);
// Transaction outside the recordset processing.
$transaction = $this->specialdb->start_delegated_transaction();
$rs = $this->specialdb->get_recordset('silly_test_table', null, 'id');
$transaction->allow_commit();
$read = [];
foreach ($rs as $rec) {
$read[] = $rec->id;
}
$this->assertEquals([1, 2, 3, 4, 5, 6, 7], $read);
$rs->close();
}
/**
* Tests that get_recordset_sql works when using cursors and a transaction overlapping.
*/
public function test_recordset_cursors_transaction_overlapping_after() {
$this->init_db(3);
// Transaction outside the recordset processing.
$rs = $this->specialdb->get_recordset('silly_test_table', null, 'id');
$transaction = $this->specialdb->start_delegated_transaction();
$read = [];
foreach ($rs as $rec) {
$read[] = $rec->id;
}
$this->assertEquals([1, 2, 3, 4, 5, 6, 7], $read);
$rs->close();
$transaction->allow_commit();
}
/**
* Tests that get_recordset_sql works when using cursors and a transaction that 'fails' and gets
* rolled back.
*/
public function test_recordset_cursors_transaction_rollback() {
$this->init_db(3);
try {
$rs = $this->specialdb->get_recordset('silly_test_table', null, 'id');
$transaction = $this->specialdb->start_delegated_transaction();
$this->specialdb->delete_records('silly_test_table', ['id' => 5]);
$transaction->rollback(new dml_transaction_exception('rollback please'));
$this->fail('should not get here');
} catch (dml_transaction_exception $e) {
$this->assertContains('rollback please', $e->getMessage());
} finally {
// Rollback should not kill our recordset.
$read = [];
foreach ($rs as $rec) {
$read[] = $rec->id;
}
$this->assertEquals([1, 2, 3, 4, 5, 6, 7], $read);
// This would happen in real code (that isn't within the same function) anyway because
// it would go out of scope.
$rs->close();
}
// OK, transaction aborted, now get the recordset again and check nothing was deleted.
$rs = $this->specialdb->get_recordset('silly_test_table', null, 'id');
$read = [];
foreach ($rs as $rec) {
$read[] = $rec->id;
}
$this->assertEquals([1, 2, 3, 4, 5, 6, 7], $read);
$rs->close();
}
/**
* Tests that get_recordset_sql works when not using cursors, because a limit is specified.
*/
public function test_recordset_no_cursors_limit() {
$this->init_db(3);
$this->specialdb->set_debug(true);
$before = $this->specialdb->perf_get_queries();
ob_start();
$rs = $this->specialdb->get_recordset_sql(
'SELECT * FROM {silly_test_table} ORDER BY id', [], 0, 100);
$index = 0;
foreach ($rs as $rec) {
$index++;
$this->assertEquals('record' . $index, $rec->msg);
}
$this->assertEquals(7, $index);
$rs->close();
$this->specialdb->set_debug(false);
$debugging = ob_get_contents();
ob_end_clean();
// Expect direct request without using cursors.
$this->assert_query_regexps(['~SELECT \* FROM~'], $debugging);
// There should have been 1 query tracked for perf log.
$this->assertEquals(1, $this->specialdb->perf_get_queries() - $before);
}
/**
* Tests that get_recordset_sql works when not using cursors, because the config setting turns
* them off.
*/
public function test_recordset_no_cursors_config() {
$this->init_db(0);
$this->specialdb->set_debug(true);
$before = $this->specialdb->perf_get_queries();
ob_start();
$rs = $this->specialdb->get_recordset_sql('SELECT * FROM {silly_test_table} ORDER BY id');