diff --git a/lib/mysql-native/commands/execute.js b/lib/mysql-native/commands/execute.js index 30c4760..9b76dbc 100644 --- a/lib/mysql-native/commands/execute.js +++ b/lib/mysql-native/commands/execute.js @@ -43,6 +43,11 @@ function execPrepared(sql, parameters) start: function() { this.stmt = sql + if (parameters) + { + parameters = parameters.map(function(p){ return p instanceof Buffer ? + p.toString('binary') : p }); + } this.params = parameters // don't execute if prepare failed @@ -96,7 +101,15 @@ function execPrepared(sql, parameters) if(typeof parameters[i] == "boolean" || (parameters[i] instanceof Array && parameters[i].every(function(x){return typeof x == 'boolean';}))){ packet.lcbits(parameters[i]); } - packet.lcstring(parameters[i].toString()) + if (parameters[i] instanceof Date) + { + packet.lcstring(parameters[i].toISOString() + .replace(/^(.*)T(.*)\..*Z/, '$1 $2')) + } + else + { + packet.lcstring(parameters[i].toString()) + } } } } @@ -172,7 +185,7 @@ function execPrepared(sql, parameters) var field = this.ps.fields[f]; if (!null_bit_map[f]) { - var value = r.unpackBinary(field.type, field.flags & field_flags.UNSIGNED); + var value = r.unpackBinary(field.type, field.flags & field_flags.UNSIGNED, field.charsetnum === 63); this.store_column(row, field, value, use_hash); } else { this.store_column(row, field, null, use_hash); diff --git a/lib/mysql-native/commands/query.js b/lib/mysql-native/commands/query.js index 356fe23..675d392 100644 --- a/lib/mysql-native/commands/query.js +++ b/lib/mysql-native/commands/query.js @@ -8,13 +8,15 @@ var result = require('../result') function parseTime(s) { - // no parsing here with non-binary prtocol, return date as is - return s; + return new Date(s+' UTC'); } -function parseString(s) +function parseString(s, type, csn) { - return s; + if (csn===63) + return new Buffer(s, 'binary'); + else + return s; } function parseNull(s) @@ -90,9 +92,9 @@ function serialiseString(a) } */ -var string2type = function(str, t) +var string2type = function(str, t, charset) { - return type_parsers[t](str, t); + return type_parsers[t](str, t, charset); } module.exports = function(sql) @@ -156,7 +158,7 @@ module.exports = function(sql) var strValue = r.lcstring(); if( charset ) strValue = charset.convertFromBytes( strValue ); - var value = string2type(strValue, field.type); // todo: move to serialiser unpackString + var value = string2type(strValue, field.type, field.charsetnum); // todo: move to serialiser unpackString this.store_column(row, field, value, use_hash) field_index++; } diff --git a/lib/mysql-native/pool.js b/lib/mysql-native/pool.js new file mode 100644 index 0000000..be4aa31 --- /dev/null +++ b/lib/mysql-native/pool.js @@ -0,0 +1,159 @@ +/* source: https://github.com/felixge/node-mysql/blob/d91edc4a6e5838285e8debd4e1d82a1cfe163074/lib/Pool.js */ +var Mysql = require('mysql-native'); + +module.exports = Pool; +function Pool(options) { + this.config = {}; + var config = this.config; + config.conn_options = options.conn_options; + config.createConnection = options.createConnection || undefined; + config.waitForConnections = (options.waitForConnections === undefined) + ? true : Boolean(options.waitForConnections); + config.connectionLimit = (options.connectionLimit === undefined) + ? 10 : Number(options.connectionLimit); + + this._allConnections = []; + this._freeConnections = []; + this._connectionQueue = []; + this._closed = false; +} + +Pool.prototype.getConnection = function(cb) { + if (this._closed) { + cb(new Error('Pool is closed.')); + return; + } + + if (this._freeConnections.length > 0) { + var connection = this._freeConnections[0]; + this._freeConnections.shift(); + cb(null, connection); + } else if (this.config.connectionLimit == 0 || this._allConnections.length < this.config.connectionLimit) { + var self = this; + var connection = this._createConnection(function() { + if (self._closed) { + cb(new Error('Pool is closed.')); + } else { + cb(null, connection); + } + }); + this._allConnections.push(connection); + } else if (this.config.waitForConnections) { + this._connectionQueue.push(cb); + } else { + cb(new Error('No connections available.')); + } +}; + +Pool.prototype.releaseConnection = function(connection) { + if (connection._poolRemoved) { + // The connection has been removed from the pool and is no longer good. + if (this._connectionQueue.length) { + var cb = this._connectionQueue[0]; + this._connectionQueue.shift(); + process.nextTick(this.getConnection.bind(this, cb)); + } + } else if (this._connectionQueue.length) { + var cb = this._connectionQueue[0]; + this._connectionQueue.shift(); + process.nextTick(cb.bind(null, null, connection)); + } else { + this._freeConnections.push(connection); + } +}; + +Pool.prototype.end = function(cb) { + this._closed = true; + cb = cb || function(err) { if( err ) throw err; }; + var self = this; + var closedConnections = 0; + var calledBack = false; + var endCB = function(err) { + if (calledBack) { + return; + } else if (err) { + calledBack = true; + delete endCB; + cb(err); + } else if (++closedConnections >= self._allConnections.length) { + calledBack = true; + delete endCB; + cb(); + } + }; + + if (this._allConnections.length == 0) { + endCB(); + return; + } + + for (var i = 0; i < this._allConnections.length; ++i) { + var connection = this._allConnections[i]; + connection.destroy = connection._realDestroy; + connection.end = connection._realEnd; + connection.end(endCB); + } +}; + +Pool.prototype._createConnection = function(on_connect) { + var self = this; + var config = this.config.conn_options; + var connection = (this.config.createConnection) + ? this.config.createConnection(config) + : Mysql.createTCPClient(config.host, 0, on_connect); + connection.auth(config.database, config.user, config.password); + + connection._realEnd = connection.end; + connection.end = function(cb) { + self.releaseConnection(connection); + if (cb) cb(); + }; + + connection._realDestroy = connection.close; + connection.destroy = function() { + self._removeConnection(connection); + connection.close(); + }; + + // When a fatal error occurs the connection's protocol ends, which will cause + // the connection to end as well, thus we only need to watch for the end event + // and we will be notified of disconnects. + connection.on('end', this._handleConnectionEnd.bind(this, connection)); + connection.on('error', this._handleConnectionError.bind(this, connection)); + + return connection; +}; + +Pool.prototype._handleConnectionEnd = function(connection) { + if (this._closed || connection._poolRemoved) { + return; + } + this._removeConnection(connection); +}; + +Pool.prototype._handleConnectionError = function(connection) { + if (this._closed || connection._poolRemoved) { + return; + } + this._removeConnection(connection); +}; + +Pool.prototype._removeConnection = function(connection) { + connection._poolRemoved = true; + for (var i = 0; i < this._allConnections.length; ++i) { + if (this._allConnections[i] === connection) { + this._allConnections.splice(i, 1); + break; + } + } + for (var i = 0; i < this._freeConnections.length; ++i) { + if (this._freeConnections[i] === connection) { + this._freeConnections.splice(i, 1); + break; + } + } + + connection.end = connection._realEnd; + connection.destroy = connection._realDestroy; + this.releaseConnection(connection); +}; diff --git a/lib/mysql-native/serializers/reader.js b/lib/mysql-native/serializers/reader.js index 6be6d4a..103786d 100644 --- a/lib/mysql-native/serializers/reader.js +++ b/lib/mysql-native/serializers/reader.js @@ -19,8 +19,7 @@ reader.prototype.dump = function() // libmysql sets all fields to zero when binary packet has zero length function zeroTime() { - // todo: check how zero date is serialized to output in mysql - return new Date(); + return new Date('0000-00-00 UTC'); } reader.prototype.unpackBinaryTime = function() @@ -63,13 +62,7 @@ reader.prototype.unpackBinaryDateTime = function() } var millisec = (length > 8) ? this.num(4) : 0; var dt = new Date(); - dt.setYear(y); - dt.setMonth(m - 1); - dt.setDate(d); - dt.setHours(hour); - dt.setMinutes(min); - dt.setSeconds(sec); - dt.setMilliseconds(millisec); + dt.setTime(Date.UTC(y, m - 1, d, hour, min, sec, millisec)); return dt; } @@ -83,9 +76,7 @@ reader.prototype.unpackBinaryDate = function() var m = this.num(1); var d = this.num(1); var dt = new Date(); - dt.setYear(y); - dt.setMonth(m - 1); - dt.setDate(d); + dt.setTime(Date.UTC(y, m - 1, d)); return dt; } @@ -154,7 +145,7 @@ function parseIEEE754Double(data, pos) } // deserialise mysql binary field -reader.prototype.unpackBinary = function(type, unsigned) +reader.prototype.unpackBinary = function(type, unsigned, binary) { // http://dev.mysql.com/doc/refman/5.0/en/numeric-types.html @@ -167,20 +158,19 @@ reader.prototype.unpackBinary = function(type, unsigned) case constants.types.MYSQL_TYPE_STRING: case constants.types.MYSQL_TYPE_VAR_STRING: case constants.types.MYSQL_TYPE_BLOB: - result = this.lcstring(); + result = binary ? this.lcbin() : this.lcstring(); break; - // TODO: here are only unsigned version of int deserialisers! case constants.types.MYSQL_TYPE_TINY: - result = this.num(1); + result = this.num(1, unsigned); break; case constants.types.MYSQL_TYPE_SHORT: - result = this.num(2); + result = this.num(2, unsigned); break; case constants.types.MYSQL_TYPE_LONG: - result = this.num(4); + result = this.num(4, unsigned); break; case constants.types.MYSQL_TYPE_LONGLONG: - result = this.num(8); + result = this.num(8, unsigned); break; case constants.types.MYSQL_TYPE_NEWDECIMAL: result = parseFloat(this.lcstring()); @@ -221,17 +211,21 @@ reader.prototype.unpackBinary = function(type, unsigned) } // read n-bytes number -// TODO: add unsigned flag and code to read signed/unsigned integers -reader.prototype.num = function(numbytes) +reader.prototype.num = function(numbytes, unsigned) { var res = 0; var factor = 1; + var fix_sign = 0; + if (unsigned!==undefined && !unsigned) + fix_sign = this.data.charCodeAt(this.pos+numbytes-1) & 0x80 ? -255 : 0; for (var i=0; i < numbytes; ++i) { - res += this.data.charCodeAt(this.pos) * factor; + res += (this.data.charCodeAt(this.pos)+fix_sign) * factor; factor = factor * 256; this.pos++; } + if (fix_sign) + res = res-1; return res; } @@ -282,6 +276,13 @@ reader.prototype.zstring = function() return res; } +reader.prototype.lcbin = function() +{ + var len = this.lcnum(); + var res = this.bytes(len); + return new Buffer(res, 'binary'); +} + reader.prototype.lcstring = function() { var len = this.lcnum(); @@ -361,6 +362,8 @@ reader.prototype.lcnum = function() this.pos++; if (b1 < 251) return b1; + else if (b1 == 251) + return null; else if (b1 == 252) return this.num(2); else if (b1 == 253) @@ -382,14 +385,9 @@ reader.prototype.readPacketHeader = function() reader.prototype.bytes = function(n) { - var res = ""; - var end = this.pos+n; - while(this.pos < end) - { - res += this.data.charAt(this.pos); - this.pos++; - } - return res; + var ret = this.data.substring(this.pos, this.pos+n); + this.pos+=n; + return ret; } module.exports = reader;