node socket内存泄露求解答
发布于 4 年前 作者 SKandAV 3313 次浏览 来自 问答

这是小弟接手的一个小中转程序,主要是通过socket然后到数据库进行一下存储查询操作。但是最近发现有内存泄露的问题,代码不过百来行

var db = require('./db')
var iconv = require('iconv-lite')
function dealres(str, obj) {
    for (let v of Object.values(obj)) {
        str += v + '|'
    }
    return str;
}
async function getmstatus(comAddress) {
    var year = new Date().getFullYear()
    let sql = `select isnull((select top 1 isChargeSuccess from  (select * from eco_ChargeRecord${(year - 1)} union select * from eco_ChargeRecord${year}) as v  where comAddress=@comAddress order by ChargeTime desc),1) isChargeSuccess`
    let status = await db.$query(sql, { comAddress }).catch((err) => {
        console.log(err)
    })
    if (status && (status.length == 0 || status[0].isChargeSuccess == 1)) {
        return true;
    } else {
        return false;
    }
}
async function testm(tempdata) {

    let obj = {
        code: 'E999',
        comAddress: [],
    }
    let PipLineName = tempdata.bname + tempdata.fname + tempdata.rname
    let sqlstr = 'select top 1 m.comAddress,m.ServiceState from  customer_PipLine p ' +
        'inner join customer_PECMRelation pec ' +
        'on p.PipLineID = pec.PipLineID ' +
        'inner join customer_MeterInfo m ' +
        'on pec.MeterID = m.MeterID ' +
        'where p.PipLineName = @PipLineName'

    let comAddress = await db.$query(sqlstr, { PipLineName }).catch((err) => {
        console.log(err)
    })
    obj.comAddress = comAddress
    if (comAddress && comAddress.length > 0) {
        if (comAddress[0].ServiceState != '6CB378A1-EB48-403C-9C74-B46C048F5422') {

            obj.code = 'E002';
        } else {
            obj.code = '0000'
        }
    } else {
        obj.code = 'E001';
    }

    console.log(obj)
    return obj;
}

function checkparams(obj) {
    let temp = true;
    for (let [k, v] of Object.entries(obj)) {
        if (v.length == 0 && k != 'ChargeOperatorName') {
            temp = false
        }
    }
    return temp;
}
var server = net.createServer(function (socket) {
    console.log('xxxx')
});
server.listen(6666);

server.on('connection', function (socket) {
    if (socket) {
        socket.setTimeout(15 * 60 * 1000, function () {
            socket.end();
        });
        console.log(socket.remoteAddress + ":" + socket.remotePort + " client connected");
    }
    socket.on('data', async function (data) {
        data = iconv.decode(data, 'GBK');
        data = data.split('|')
        data = data.map((v) => {
            return v == '\r\n' ? '' : v
        })
        let tempdata = {
            ChargeNo: data[1] || '',//银行流水号
            bname: data[2] || '',//建筑楼栋名称
            fname: data[3] || '',//层名称
            rname: data[4] || '',//房间名称
        }
        switch (data[0]) {
            case '3001':
                let resdata1 = {
                    ChargeNo: data[1],//银行流水
                    code: 'E003',
                    ChargeMoneyAmount: '',//剩余电费
                    ZYDL: '',//总用电量(度)
                    ChargeTime: '',//剩余电费更新时间
                    msg: '缺少必填参数'//附加信息
                }
                if (checkparams(tempdata)) {
                    let comAddressobj = await testm(tempdata)
                    if (comAddressobj.code == 'E001') {
                        resdata1.code = 'E001'
                        resdata1.msg = '对应的电表不存在'

                    } else if (comAddressobj.code == 'E999') {
                        resdata1.code = 'E999'
                        resdata1.msg = '数据库连接异常'
                    }
                    else {
                        if (comAddressobj.code == 'E002') {
                            resdata1.code = 'E002'
                            resdata1.msg = '电表状态不正常,无法充值'
                        } else if (comAddressobj.code == '0000') {
                            resdata1.code = '0000'
                            resdata1.msg = 'success'
                        }
                        let comAddress = comAddressobj.comAddress[0].comAddress
                        let status = await getmstatus(comAddress).catch((err) => {
                            resdata1.code = 'E999'
                            resdata1.msg = '数据库连接异常'
                            let str33 = dealres('4001|', resdata1)
                            socket.write(iconv.encode(str33, 'GBK'))

                        })
                        if (!status) {
                            resdata1.code = 'E002'
                            resdata1.msg = '电表状态不正常,无法充值'
                            let str2 = dealres('4001|', resdata1)
                            socket.write(iconv.encode(str2, 'GBK'))

                        }
                        var year = new Date().getFullYear();
                        let sql1 = 'select p.ZYDL, p.ZYE, CONVERT(varchar(100), p.UpdateDateTime, 20) as UpdateDateTime   from eco_PIA_MonitorData p ' +
                            'left join eco_ChargeRecord' + year + ' r ' +
                            'on p.comAddress = r.comAddress ' +
                            'where p.comAddress = @comAddress'
                        let res1 = await db.$query(sql1, { comAddress }).catch((err) => {
                            if (err) {
                                resdata1.code = 'E999'
                                resdata1.msg = 'fail'
                                let str1 = dealres('4001|', resdata1)
                                socket.write(iconv.encode(str1, 'GBK'))
                                return;
                            }
                        })
                        if (res1 && res1.length > 0) {
                            resdata1.ChargeMoneyAmount = res1[0].ZYE//剩余电费
                            resdata1.ZYDL = res1[0].ZYDL//总用电量(度)
                            resdata1.ChargeTime = res1[0].UpdateDateTime//剩余电费更新时间
                        }
                    }
                }
                let str1 = dealres('4001|', resdata1)
                socket.write(iconv.encode(str1, 'GBK'))
                break;
            case '3002':
                let tempdata1 = {
                    ChargeTime: data[5] || '',//交易时间
                    Order_No: data[6] || '',//缴费中心流水号
                    ChargeMoneyAmount: data[7] || '',//缴费金额
                    ChargeOperatorName: data[8] || '',//缴费人
                }
                let tempobj = Object.assign(tempdata, tempdata1)
                let resdata2 = {
                    ChargeNo: data[1],
                    code: '0000',
                    msg: ''
                }
                if (!checkparams(tempobj)) {
                    resdata2.code = 'E003'
                } else {
                    let comAddressobj2 = await testm(tempdata)
                    if (comAddressobj2.code == 'E002') {
                        resdata2.code = 'E002'
                        resdata2.msg = '电表状态不正常,无法充值'
                        let str2 = dealres('4002|', resdata2)
                        socket.write(iconv.encode(str2, 'GBK'))
                        return;
                    } else if (comAddressobj2.code == 'E001') {
                        resdata2.code = 'E001'
                        resdata2.msg = '对应的电表不存在'
                        let str2 = dealres('4002|', resdata2)
                        socket.write(iconv.encode(str2, 'GBK'))
                        return;
                    } else if (comAddressobj2.code == '0000') {
                        let senddata = {
                            comAddress: comAddressobj2.comAddress[0].comAddress,//表号
                            ChargeMoneyAmount: tempobj.ChargeMoneyAmount,   //金额
                            ChargeOperatorName: tempobj.ChargeOperatorName,//操作人
                            Order_No: tempobj.Order_No,//缴费中心流水
                            ChargeTime: tempobj.ChargeTime,  //缴费时间
                            ChargeNo: tempobj.ChargeNo   //银行流水
                        }
                        let res = await db.$execute('ProWeChatChargeToAgriculture', senddata).catch((err) => {
                            resdata2.code = 'E999'
                            resdata2.msg = '调用存储过程出错'
                            let str2 = dealres('4002|', resdata2)
                            socket.write(iconv.encode(str2, 'GBK'))
                            return;
                        })
                        switch (res) {
                            case 0:
                                resdata2.msg = '记录插入成功,未执行'
                                break;
                            case 1:
                                resdata2.msg = '记录插入成功,到表成功'
                                break;
                            case -1:
                                resdata2.msg = '记录插入失败'
                                resdata2.code = 'E999'
                                break;
                            default:
                                resdata2.msg = '记录插入成功,到表失败'
                                break;
                        }
                        let str2 = dealres('4002|', resdata2)
                        socket.write(iconv.encode(str2, 'GBK'))
                        return;
                    } else {
                        resdata2.code = 'E999'
                        resdata2.msg = '数据库连接异常'
                        let str2 = dealres('4002|', resdata2)
                        socket.write(iconv.encode(str2, 'GBK'))
                        return;
                    }
                }
                break;
            default:
                socket.write(iconv.encode('参数有误', 'GBK'))
                break;
        }
    });
    socket.on('error', function (val) {
        console.log('连接出错', val);
    });
    socket.on('end', function () {
        console.log("连接断开");
    });
});
server.on('error', function (val) {
    console.log("服务出错", val);
});
server.on('close', function (socket) {
    if (socket)
        console.log(socket.remoteAddress + " Client closed");
});
process.on('uncaughtException', function (err) {
    console.log("uncaughtException!");
    //打印出错误
    console.log(err);
    //打印出错误的调用栈方便调试
    console.log(err.stack);
});
process.on('unhandledRejection', error => {
    // Will print "unhandledRejection err is not defined"
    console.log('unhandledRejection', error.message);
});```
并且使用的用户并不多,但是内存肉眼看见的上涨,数据库用的tedious
```const Connection = require('tedious').Connection;
const Request = require('tedious').Request;
const TYPES = require('tedious').TYPES;
const db = {}
const config = {
    'userName': '',
    'password': '',
    'server': '',
    'options': {
        'port': 1433,
        'database': '',
        'encrypt': false,
    }
};
db.$query = (str, obj) => {
    return new Promise((resolve, reject) => {
        var connection = new Connection(config);
        var rows = [];
        connection.on('connect', function (err) {
            if (err) {
                reject(err);
            } else {
                var request = new Request(str, function (err, rowCount) {
                    if (err) {
                        reject(err);
                    }

                    connection.close();
                });

                for (let [key, value] of Object.entries(obj)) {
                    request.addParameter(key, TYPES.NVarChar, value);
                }
                var result = [];
                request.on('row', function (columns, idx) {
                    var obj = {}
                    columns.forEach(function (column) {
                        if (column.value !== null) {
                            var key = column.metadata.colName
                            var val = column.value
                            obj[key] = val
                        }
                    });
                    result.push(obj)
                })

                request.on('doneProc', function (rowCount, more, rows) {

                    if (more === false) {
                        // console.log(result)
                        resolve(result);
                    }

                })
                connection.execSql(request);                                 //执行sql语句
            }
        });
    })
}
db.$execute = (str, obj) => {
    return new Promise((resolve, reject) => {
        var connection = new Connection(config);
        connection.on('connect', function (err) {                 //连接数据库,执行匿名函数
            if (err) {
                reject(err)
            } else {
                var request = new Request(str, function (err) {
                    if (err) {
                        reject(err)
                    };
                    connection.close();
                });

                for (let [key, value] of Object.entries(obj)) {

                    request.addParameter(key, TYPES.NVarChar, value);
                }

                request.addOutputParameter('Result', TYPES.Int);
                request.on('returnValue', function (paramName, value, metadata) {
                    resolve(value)
                });
                connection.callProcedure(request);                                 //执行sql语句
            }
        });
    })
}

希望大佬能百忙中看看,求求了,砰砰砰!磕头了

5 回复
                var request = new Request(str, function (err) {
                    if (err) {
                        reject(err)
                    };
                    connection.close();
                });

你这里的connection.close();会不会执行到?因为这个回调的完整形式是function (err, rowCount, rows) { }是有rows返回的,而你又在request.on(‘row’)接收数据,那么那里会不会执行到? 而且connection应该不需要每次都新建一个

@zengming00 确实,这里是需要修改一下

试试这个开源工具,https://zhuanlan.zhihu.com/p/147576798 有专门的的内存泄露分析功能: image.png

@hyj1991 好的,谢谢好兄弟

@SKandAV 兄弟,你得考虑拆包和粘包的问题啊

const datas = Buffer.alloc();
socket.on('data', async function (data) {
	datas = Buffer.concat([datas,data])
})
        

等拆完包之,判断datas是不是一个完整的包,如果是一个完整包的包,就把前面一段截取下来进行操作。

还有一种方式是基于socket的readable事件去处理,通过socket.read(长度),返回则代表是一个完整的包,返回为Null则代表一个包体还没有传输完整。

因为TCP是基于流的模式传输的,每次传输的TCP包的长度不一样,需要你自己去粘包和拆包。

回到顶部