Implementation of fetching AE max columns

This commit is contained in:
v-kaywon 2017-08-08 12:03:51 -07:00
parent 3ab6df9105
commit 22b4817f65
3 changed files with 113 additions and 28 deletions

View file

@ -83,7 +83,7 @@ bool get_bit( _In_ void* ptr, _In_ unsigned int bit )
// read in LOB field during buffered result creation
SQLPOINTER read_lob_field( _Inout_ sqlsrv_stmt* stmt, _In_ SQLUSMALLINT field_index, _In_ sqlsrv_buffered_result_set::meta_data& meta,
_In_ zend_long mem_used TSRMLS_DC );
_In_ zend_long mem_used, _In_ size_t row_count TSRMLS_DC );
// dtor for each row in the cache
void cache_row_dtor( _In_ zval* data );
@ -687,7 +687,7 @@ sqlsrv_buffered_result_set::sqlsrv_buffered_result_set( _Inout_ sqlsrv_stmt* stm
out_buffer_length = &out_buffer_temp;
SQLPOINTER* lob_addr = reinterpret_cast<SQLPOINTER*>( &row[ meta[i].offset ] );
*lob_addr = read_lob_field( stmt, i, meta[i], mem_used TSRMLS_CC );
*lob_addr = read_lob_field( stmt, i, meta[i], mem_used, row_count TSRMLS_CC );
// a NULL pointer means NULL field
if( *lob_addr == NULL ) {
*out_buffer_length = SQL_NULL_DATA;
@ -734,12 +734,12 @@ sqlsrv_buffered_result_set::sqlsrv_buffered_result_set( _Inout_ sqlsrv_stmt* stm
break;
}
row_count++;
if( *out_buffer_length == SQL_NULL_DATA ) {
set_bit( row, i );
}
}
row_count++;
SQLSRV_ASSERT( row_count < INT_MAX, "Hard maximum of 2 billion rows exceeded in a buffered query" );
// add it to the cache
@ -1498,7 +1498,7 @@ void cache_row_dtor( _In_ zval* data )
}
SQLPOINTER read_lob_field( _Inout_ sqlsrv_stmt* stmt, _In_ SQLUSMALLINT field_index, _In_ sqlsrv_buffered_result_set::meta_data& meta,
_In_ zend_long mem_used TSRMLS_DC )
_In_ zend_long mem_used, _In_ size_t row_count TSRMLS_DC )
{
SQLSMALLINT extra = 0;
SQLULEN* output_buffer_len = NULL;
@ -1563,6 +1563,19 @@ SQLPOINTER read_lob_field( _Inout_ sqlsrv_stmt* stmt, _In_ SQLUSMALLINT field_in
SQLSRV_ASSERT( SQL_SUCCEEDED( r ), "Unknown SQL error not triggered" );
if ( stmt->conn->ce_option.enabled == true ) {
// cursor type SQLSRV_CURSOR_BUFFERED has to be FORWARD_ONLY
// thus has to close and reopen cursor to reset the cursor buffer
core::SQLCloseCursor(stmt);
core::SQLExecute(stmt);
// FETCH_NEXT until the cursor reaches the row that it was at
for (int i = 0; i <= row_count; i++) {
core::SQLFetchScroll(stmt, SQL_FETCH_NEXT, 0);
}
}
else {
already_read += to_read - already_read;
}
// if the type of the field returns the total to be read, we use that and preallocate the buffer
if( last_field_len != SQL_NO_TOTAL ) {
@ -1571,8 +1584,6 @@ SQLPOINTER read_lob_field( _Inout_ sqlsrv_stmt* stmt, _In_ SQLUSMALLINT field_in
throw core::CoreException();
}
already_read += to_read - already_read;
to_read = last_field_len;
buffer.resize( to_read + extra + sizeof( SQLULEN ));
output_buffer_len = reinterpret_cast<SQLULEN*>( buffer.get() );
@ -1582,7 +1593,6 @@ SQLPOINTER read_lob_field( _Inout_ sqlsrv_stmt* stmt, _In_ SQLUSMALLINT field_in
}
// otherwise allocate another chunk of memory to read in
else {
already_read += to_read - already_read;
to_read *= 2;
CHECK_CUSTOM_ERROR( mem_used + to_read > stmt->buffered_query_limit * 1024, stmt,
SQLSRV_ERROR_BUFFER_LIMIT_EXCEEDED, stmt->buffered_query_limit ) {

View file

@ -1321,6 +1321,7 @@ struct sqlsrv_stmt : public sqlsrv_context {
bool past_fetch_end; // Core_sqlsrv_fetch sets this field when the statement goes beyond the last row
sqlsrv_result_set* current_results; // Current result set
SQLULEN cursor_type; // Type of cursor for the current result set
int fwd_row_index; // fwd_row_index is the current row index, SQL_CURSOR_FORWARD_ONLY
bool has_rows; // Has_rows is set if there are actual rows in the row set
bool fetch_called; // Used by core_sqlsrv_get_field to return an informative error if fetch not yet called
int last_field_index; // last field retrieved by core_sqlsrv_get_field
@ -1891,6 +1892,14 @@ namespace core {
}
}
inline void SQLCloseCursor( _Inout_ sqlsrv_stmt* stmt TSRMLS_DC )
{
SQLRETURN r = ::SQLCloseCursor( stmt->handle() );
CHECK_SQL_ERROR_OR_WARNING( r, stmt ) {
throw CoreException();
}
}
inline void SQLColAttribute( _Inout_ sqlsrv_stmt* stmt, _In_ SQLUSMALLINT field_index, _In_ SQLUSMALLINT field_identifier,
_Out_writes_bytes_opt_(buffer_length) SQLPOINTER field_type_char, _In_ SQLSMALLINT buffer_length,
@ -2012,6 +2021,15 @@ namespace core {
CHECK_SQL_ERROR_OR_WARNING( r, ctx ) {}
}
inline void SQLGetStmtAttr( _Inout_ sqlsrv_stmt* stmt, _In_ SQLINTEGER attr, _Out_writes_opt_(buf_len) void* value_ptr, _In_ SQLINTEGER buf_len, _Out_opt_ SQLINTEGER* str_len TSRMLS_DC)
{
SQLRETURN r;
r = ::SQLGetStmtAttr( stmt->handle(), attr, value_ptr, buf_len, str_len );
CHECK_SQL_ERROR_OR_WARNING( r, stmt ) {
throw CoreException();
}
}
inline SQLRETURN SQLGetData( _Inout_ sqlsrv_stmt* stmt, _In_ SQLUSMALLINT field_index, _In_ SQLSMALLINT target_type,
_Out_writes_opt_(buffer_length) void* buffer, _In_ SQLLEN buffer_length, _Out_opt_ SQLLEN* out_buffer_length,
_In_ bool handle_warning TSRMLS_DC )

View file

@ -131,6 +131,7 @@ sqlsrv_stmt::sqlsrv_stmt( _In_ sqlsrv_conn* c, _In_ SQLHANDLE handle, _In_ error
past_fetch_end( false ),
current_results( NULL ),
cursor_type( SQL_CURSOR_FORWARD_ONLY ),
fwd_row_index( -1 ),
has_rows( false ),
fetch_called( false ),
last_field_index( -1 ),
@ -213,6 +214,7 @@ void sqlsrv_stmt::free_param_data( TSRMLS_D )
void sqlsrv_stmt::new_result_set( TSRMLS_D )
{
this->fetch_called = false;
this->fwd_row_index = -1;
this->has_rows = false;
this->past_next_result_end = false;
this->past_fetch_end = false;
@ -796,6 +798,9 @@ bool core_sqlsrv_fetch( _Inout_ sqlsrv_stmt* stmt, _In_ SQLSMALLINT fetch_orient
// move to the record requested. For absolute records, we use a 0 based offset, so +1 since
// SQLFetchScroll uses a 1 based offset, otherwise for relative, just use the fetch_offset provided.
SQLRETURN r = stmt->current_results->fetch( fetch_orientation, ( fetch_orientation == SQL_FETCH_RELATIVE ) ? fetch_offset : fetch_offset + 1 TSRMLS_CC );
if ( stmt->cursor_type == SQL_CURSOR_FORWARD_ONLY && stmt->conn->ce_option.enabled == true ) {
stmt->fwd_row_index++;
}
if( r == SQL_NO_DATA ) {
// if this is a forward only cursor, mark that we've passed the end so future calls result in an error
if( stmt->cursor_type == SQL_CURSOR_FORWARD_ONLY ) {
@ -2171,6 +2176,7 @@ void get_field_as_string( _Inout_ sqlsrv_stmt* stmt, _In_ SQLUSMALLINT field_ind
else {
// Get the SQL type of the field. unixODBC 2.3.1 requires wide calls to support pooling
core::SQLColAttributeW( stmt, field_index + 1, SQL_DESC_CONCISE_TYPE, NULL, 0, NULL, &sql_field_type TSRMLS_CC );
SQLLEN sql_field_len = 0;
// Calculate the field size.
calc_string_size( stmt, field_index, sql_field_type, sql_display_size TSRMLS_CC );
@ -2183,8 +2189,9 @@ void get_field_as_string( _Inout_ sqlsrv_stmt* stmt, _In_ SQLUSMALLINT field_ind
// if this is a large type, then read the first few bytes to get the actual length from SQLGetData
if( sql_display_size == 0 || sql_display_size == INT_MAX ||
sql_display_size == INT_MAX >> 1 || sql_display_size == UINT_MAX - 1 ) {
field_len_temp = INITIAL_FIELD_STRING_LEN;
SQLLEN initiallen = field_len_temp + extra;
@ -2234,19 +2241,48 @@ void get_field_as_string( _Inout_ sqlsrv_stmt* stmt, _In_ SQLUSMALLINT field_ind
field_value_temp = static_cast<char*>( sqlsrv_realloc( field_value_temp, field_len_temp + extra + 1 ));
field_len_temp -= initial_field_len;
// only handled differently when AE is on because AE does not support streaming
// SQLSRV_CURSOR_BUFFERED already fetched everything beforehand, so doesn't need to be handled differently here
if ( stmt->conn->ce_option.enabled == true && stmt->current_results->odbc->cursor_type != SQLSRV_CURSOR_BUFFERED ) {
// if the cursor is forward only, we have no choice but to close the cursor and open it again
// but if not forward only, we can simply fetch next then fetch prior to reset the cursor
if ( stmt->current_results->odbc->cursor_type == SQL_CURSOR_FORWARD_ONLY ) {
// reopen the cursor
core::SQLCloseCursor( stmt->current_results->odbc );
core::SQLExecute( stmt );
// FETCH_NEXT until the cursor reaches the row that it was at
for ( int i = 0; i <= stmt->fwd_row_index; i++ ) {
core::SQLFetchScroll( stmt->current_results->odbc, SQL_FETCH_NEXT, 0 );
}
}
else {
core::SQLFetchScroll( stmt->current_results->odbc, SQL_FETCH_NEXT, 0 );
core::SQLFetchScroll( stmt->current_results->odbc, SQL_FETCH_PRIOR, 0 );
}
// now that the fetch buffer has reset, fetch the original column again with a bigger buffer length
r = stmt->current_results->get_data( field_index + 1, c_type, field_value_temp, field_len_temp + extra,
&dummy_field_len, false /*handle_warning*/ TSRMLS_CC );
// if field_len_temp was bit enough to hold all data, dummy_field_len contain the actual amount retrieved,
// not SQL_NO_TOTAL
if ( dummy_field_len != SQL_NO_TOTAL )
field_len_temp = dummy_field_len;
else
field_len_temp += initial_field_len;
}
else {
field_len_temp -= initial_field_len;
// Get the rest of the data.
r = stmt->current_results->get_data( field_index + 1, c_type, field_value_temp + initial_field_len,
field_len_temp + extra, &dummy_field_len,
false /*handle_warning*/ TSRMLS_CC );
// the last packet will contain the actual amount retrieved, not SQL_NO_TOTAL
// so we calculate the actual length of the string with that.
if( dummy_field_len != SQL_NO_TOTAL )
field_len_temp += dummy_field_len;
else
field_len_temp += initial_field_len;
// Get the rest of the data.
r = stmt->current_results->get_data( field_index + 1, c_type, field_value_temp + initial_field_len,
field_len_temp + extra, &dummy_field_len,
false /*handle_warning*/ TSRMLS_CC );
// the last packet will contain the actual amount retrieved, not SQL_NO_TOTAL
// so we calculate the actual length of the string with that.
if ( dummy_field_len != SQL_NO_TOTAL )
field_len_temp += dummy_field_len;
else
field_len_temp += initial_field_len;
}
if( r == SQL_SUCCESS_WITH_INFO ) {
core::SQLGetDiagField( stmt, 1, SQL_DIAG_SQLSTATE, state, SQL_SQLSTATE_BUFSIZE, &len
@ -2260,13 +2296,36 @@ void get_field_as_string( _Inout_ sqlsrv_stmt* stmt, _In_ SQLUSMALLINT field_ind
// We got the field_len_temp from SQLGetData call.
field_value_temp = static_cast<char*>( sqlsrv_realloc( field_value_temp, field_len_temp + extra + 1 ));
// We have already recieved INITIAL_FIELD_STRING_LEN size data.
field_len_temp -= INITIAL_FIELD_STRING_LEN;
if ( stmt->conn->ce_option.enabled == true && stmt->current_results->odbc->cursor_type != SQLSRV_CURSOR_BUFFERED ) {
// if the cursor is forward only, we have no choice but to close the cursor and open it again
// but if not forward only, we can simply fetch next then fetch prior to reset the cursor
if ( stmt->current_results->odbc->cursor_type == SQL_CURSOR_FORWARD_ONLY ) {
// reopen the cursor
core::SQLCloseCursor( stmt->current_results->odbc );
core::SQLExecute( stmt );
// FETCH_NEXT until the cursor reaches the row that it was at
for ( int i = 0; i <= stmt->fwd_row_index; i++ ) {
core::SQLFetchScroll( stmt->current_results->odbc, SQL_FETCH_NEXT, 0 );
}
}
else {
core::SQLFetchScroll( stmt->current_results->odbc, SQL_FETCH_NEXT, 0 );
core::SQLFetchScroll( stmt->current_results->odbc, SQL_FETCH_PRIOR, 0 );
}
// now that the fetch buffer has reset, fetch the original column again with a bigger buffer length
r = stmt->current_results->get_data( field_index + 1, c_type, field_value_temp, field_len_temp + extra,
&dummy_field_len, false /*handle_warning*/ TSRMLS_CC );
}
else {
// We have already recieved INITIAL_FIELD_STRING_LEN size data.
field_len_temp -= INITIAL_FIELD_STRING_LEN;
// Get the rest of the data.
r = stmt->current_results->get_data( field_index + 1, c_type, field_value_temp + INITIAL_FIELD_STRING_LEN,
field_len_temp + extra, &dummy_field_len,
true /*handle_warning*/ TSRMLS_CC );
// Get the rest of the data.
r = stmt->current_results->get_data( field_index + 1, c_type, field_value_temp + INITIAL_FIELD_STRING_LEN,
field_len_temp + extra, &dummy_field_len,
true /*handle_warning*/ TSRMLS_CC );
field_len_temp += INITIAL_FIELD_STRING_LEN;
}
if( dummy_field_len == SQL_NULL_DATA ) {
field_value = NULL;
@ -2277,8 +2336,6 @@ void get_field_as_string( _Inout_ sqlsrv_stmt* stmt, _In_ SQLUSMALLINT field_ind
CHECK_CUSTOM_ERROR(( r == SQL_NO_DATA ), stmt, SQLSRV_ERROR_NO_DATA, field_index ) {
throw core::CoreException();
}
field_len_temp += INITIAL_FIELD_STRING_LEN;
}
} // if( is_truncation_warning ( state ) )