node socket内存泄露求解答
这是小弟接手的一个小中转程序,主要是通过socket然后到数据库进行一下存储查询操作。但是最近发现有内存泄露的问题,代码不过百来行
var db = require('./db')
var iconv = require('iconv-lite')
function dealres(str, obj) {
for (let v of Object.values(obj)) {
str += v + '|'
}
return str;
}
async function getmstatus(comAddress) {
var year = new Date().getFullYear()
let sql = `select isnull((select top 1 isChargeSuccess from (select * from eco_ChargeRecord${(year - 1)} union select * from eco_ChargeRecord${year}) as v where comAddress=@comAddress order by ChargeTime desc),1) isChargeSuccess`
let status = await db.$query(sql, { comAddress }).catch((err) => {
console.log(err)
})
if (status && (status.length == 0 || status[0].isChargeSuccess == 1)) {
return true;
} else {
return false;
}
}
async function testm(tempdata) {
let obj = {
code: 'E999',
comAddress: [],
}
let PipLineName = tempdata.bname + tempdata.fname + tempdata.rname
let sqlstr = 'select top 1 m.comAddress,m.ServiceState from customer_PipLine p ' +
'inner join customer_PECMRelation pec ' +
'on p.PipLineID = pec.PipLineID ' +
'inner join customer_MeterInfo m ' +
'on pec.MeterID = m.MeterID ' +
'where p.PipLineName = @PipLineName'
let comAddress = await db.$query(sqlstr, { PipLineName }).catch((err) => {
console.log(err)
})
obj.comAddress = comAddress
if (comAddress && comAddress.length > 0) {
if (comAddress[0].ServiceState != '6CB378A1-EB48-403C-9C74-B46C048F5422') {
obj.code = 'E002';
} else {
obj.code = '0000'
}
} else {
obj.code = 'E001';
}
console.log(obj)
return obj;
}
function checkparams(obj) {
let temp = true;
for (let [k, v] of Object.entries(obj)) {
if (v.length == 0 && k != 'ChargeOperatorName') {
temp = false
}
}
return temp;
}
var server = net.createServer(function (socket) {
console.log('xxxx')
});
server.listen(6666);
server.on('connection', function (socket) {
if (socket) {
socket.setTimeout(15 * 60 * 1000, function () {
socket.end();
});
console.log(socket.remoteAddress + ":" + socket.remotePort + " client connected");
}
socket.on('data', async function (data) {
data = iconv.decode(data, 'GBK');
data = data.split('|')
data = data.map((v) => {
return v == '\r\n' ? '' : v
})
let tempdata = {
ChargeNo: data[1] || '',//银行流水号
bname: data[2] || '',//建筑楼栋名称
fname: data[3] || '',//层名称
rname: data[4] || '',//房间名称
}
switch (data[0]) {
case '3001':
let resdata1 = {
ChargeNo: data[1],//银行流水
code: 'E003',
ChargeMoneyAmount: '',//剩余电费
ZYDL: '',//总用电量(度)
ChargeTime: '',//剩余电费更新时间
msg: '缺少必填参数'//附加信息
}
if (checkparams(tempdata)) {
let comAddressobj = await testm(tempdata)
if (comAddressobj.code == 'E001') {
resdata1.code = 'E001'
resdata1.msg = '对应的电表不存在'
} else if (comAddressobj.code == 'E999') {
resdata1.code = 'E999'
resdata1.msg = '数据库连接异常'
}
else {
if (comAddressobj.code == 'E002') {
resdata1.code = 'E002'
resdata1.msg = '电表状态不正常,无法充值'
} else if (comAddressobj.code == '0000') {
resdata1.code = '0000'
resdata1.msg = 'success'
}
let comAddress = comAddressobj.comAddress[0].comAddress
let status = await getmstatus(comAddress).catch((err) => {
resdata1.code = 'E999'
resdata1.msg = '数据库连接异常'
let str33 = dealres('4001|', resdata1)
socket.write(iconv.encode(str33, 'GBK'))
})
if (!status) {
resdata1.code = 'E002'
resdata1.msg = '电表状态不正常,无法充值'
let str2 = dealres('4001|', resdata1)
socket.write(iconv.encode(str2, 'GBK'))
}
var year = new Date().getFullYear();
let sql1 = 'select p.ZYDL, p.ZYE, CONVERT(varchar(100), p.UpdateDateTime, 20) as UpdateDateTime from eco_PIA_MonitorData p ' +
'left join eco_ChargeRecord' + year + ' r ' +
'on p.comAddress = r.comAddress ' +
'where p.comAddress = @comAddress'
let res1 = await db.$query(sql1, { comAddress }).catch((err) => {
if (err) {
resdata1.code = 'E999'
resdata1.msg = 'fail'
let str1 = dealres('4001|', resdata1)
socket.write(iconv.encode(str1, 'GBK'))
return;
}
})
if (res1 && res1.length > 0) {
resdata1.ChargeMoneyAmount = res1[0].ZYE//剩余电费
resdata1.ZYDL = res1[0].ZYDL//总用电量(度)
resdata1.ChargeTime = res1[0].UpdateDateTime//剩余电费更新时间
}
}
}
let str1 = dealres('4001|', resdata1)
socket.write(iconv.encode(str1, 'GBK'))
break;
case '3002':
let tempdata1 = {
ChargeTime: data[5] || '',//交易时间
Order_No: data[6] || '',//缴费中心流水号
ChargeMoneyAmount: data[7] || '',//缴费金额
ChargeOperatorName: data[8] || '',//缴费人
}
let tempobj = Object.assign(tempdata, tempdata1)
let resdata2 = {
ChargeNo: data[1],
code: '0000',
msg: ''
}
if (!checkparams(tempobj)) {
resdata2.code = 'E003'
} else {
let comAddressobj2 = await testm(tempdata)
if (comAddressobj2.code == 'E002') {
resdata2.code = 'E002'
resdata2.msg = '电表状态不正常,无法充值'
let str2 = dealres('4002|', resdata2)
socket.write(iconv.encode(str2, 'GBK'))
return;
} else if (comAddressobj2.code == 'E001') {
resdata2.code = 'E001'
resdata2.msg = '对应的电表不存在'
let str2 = dealres('4002|', resdata2)
socket.write(iconv.encode(str2, 'GBK'))
return;
} else if (comAddressobj2.code == '0000') {
let senddata = {
comAddress: comAddressobj2.comAddress[0].comAddress,//表号
ChargeMoneyAmount: tempobj.ChargeMoneyAmount, //金额
ChargeOperatorName: tempobj.ChargeOperatorName,//操作人
Order_No: tempobj.Order_No,//缴费中心流水
ChargeTime: tempobj.ChargeTime, //缴费时间
ChargeNo: tempobj.ChargeNo //银行流水
}
let res = await db.$execute('ProWeChatChargeToAgriculture', senddata).catch((err) => {
resdata2.code = 'E999'
resdata2.msg = '调用存储过程出错'
let str2 = dealres('4002|', resdata2)
socket.write(iconv.encode(str2, 'GBK'))
return;
})
switch (res) {
case 0:
resdata2.msg = '记录插入成功,未执行'
break;
case 1:
resdata2.msg = '记录插入成功,到表成功'
break;
case -1:
resdata2.msg = '记录插入失败'
resdata2.code = 'E999'
break;
default:
resdata2.msg = '记录插入成功,到表失败'
break;
}
let str2 = dealres('4002|', resdata2)
socket.write(iconv.encode(str2, 'GBK'))
return;
} else {
resdata2.code = 'E999'
resdata2.msg = '数据库连接异常'
let str2 = dealres('4002|', resdata2)
socket.write(iconv.encode(str2, 'GBK'))
return;
}
}
break;
default:
socket.write(iconv.encode('参数有误', 'GBK'))
break;
}
});
socket.on('error', function (val) {
console.log('连接出错', val);
});
socket.on('end', function () {
console.log("连接断开");
});
});
server.on('error', function (val) {
console.log("服务出错", val);
});
server.on('close', function (socket) {
if (socket)
console.log(socket.remoteAddress + " Client closed");
});
process.on('uncaughtException', function (err) {
console.log("uncaughtException!");
//打印出错误
console.log(err);
//打印出错误的调用栈方便调试
console.log(err.stack);
});
process.on('unhandledRejection', error => {
// Will print "unhandledRejection err is not defined"
console.log('unhandledRejection', error.message);
});```
并且使用的用户并不多,但是内存肉眼看见的上涨,数据库用的tedious
```const Connection = require('tedious').Connection;
const Request = require('tedious').Request;
const TYPES = require('tedious').TYPES;
const db = {}
const config = {
'userName': '',
'password': '',
'server': '',
'options': {
'port': 1433,
'database': '',
'encrypt': false,
}
};
db.$query = (str, obj) => {
return new Promise((resolve, reject) => {
var connection = new Connection(config);
var rows = [];
connection.on('connect', function (err) {
if (err) {
reject(err);
} else {
var request = new Request(str, function (err, rowCount) {
if (err) {
reject(err);
}
connection.close();
});
for (let [key, value] of Object.entries(obj)) {
request.addParameter(key, TYPES.NVarChar, value);
}
var result = [];
request.on('row', function (columns, idx) {
var obj = {}
columns.forEach(function (column) {
if (column.value !== null) {
var key = column.metadata.colName
var val = column.value
obj[key] = val
}
});
result.push(obj)
})
request.on('doneProc', function (rowCount, more, rows) {
if (more === false) {
// console.log(result)
resolve(result);
}
})
connection.execSql(request); //执行sql语句
}
});
})
}
db.$execute = (str, obj) => {
return new Promise((resolve, reject) => {
var connection = new Connection(config);
connection.on('connect', function (err) { //连接数据库,执行匿名函数
if (err) {
reject(err)
} else {
var request = new Request(str, function (err) {
if (err) {
reject(err)
};
connection.close();
});
for (let [key, value] of Object.entries(obj)) {
request.addParameter(key, TYPES.NVarChar, value);
}
request.addOutputParameter('Result', TYPES.Int);
request.on('returnValue', function (paramName, value, metadata) {
resolve(value)
});
connection.callProcedure(request); //执行sql语句
}
});
})
}
希望大佬能百忙中看看,求求了,砰砰砰!磕头了
5 回复
var request = new Request(str, function (err) {
if (err) {
reject(err)
};
connection.close();
});
你这里的connection.close();会不会执行到?因为这个回调的完整形式是function (err, rowCount, rows) { }是有rows返回的,而你又在request.on(‘row’)接收数据,那么那里会不会执行到? 而且connection应该不需要每次都新建一个
@zengming00 确实,这里是需要修改一下
试试这个开源工具,https://zhuanlan.zhihu.com/p/147576798 有专门的的内存泄露分析功能:
@hyj1991 好的,谢谢好兄弟
@SKandAV 兄弟,你得考虑拆包和粘包的问题啊
const datas = Buffer.alloc();
socket.on('data', async function (data) {
datas = Buffer.concat([datas,data])
})
等拆完包之,判断datas是不是一个完整的包,如果是一个完整包的包,就把前面一段截取下来进行操作。
还有一种方式是基于socket的readable事件去处理,通过socket.read(长度),返回则代表是一个完整的包,返回为Null则代表一个包体还没有传输完整。
因为TCP是基于流的模式传输的,每次传输的TCP包的长度不一样,需要你自己去粘包和拆包。