求推荐 MongoDB 同步到 ElasticSearch 的技术方案
发布于 6 年前 作者 dlyt 5163 次浏览 来自 问答

目前是用 bd 自带的 watch 监听增改事件, 然后同步到 es. 但是服务器挂掉之后就没发同步了, 会出现数据不一致的情况.

有什么方案可以优雅得解决一下吗?

我找到一种方案: MongoDB => Kafka => ElasticSearch 搞了一下发现数据是可以推过去, 但是到 es 的数据结构不行. 我考虑改成 MongoDB => Kafka => Node.js => ElasticSearch 自己处理后推给 es;

{
    "took": 5,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 4,
        "max_score": 1,
        "hits": [
            {
                "_index": "estest",
                "_type": "customer",
                "_id": "estest+0+0",
                "_score": 1,
                "_source": {
                    "after": "{\"_id\" : {\"$oid\" : \"5c9c7b471bc46c008b81ec80\"},\"1\" : 1226}",
                    "patch": null,
                    "source": {
                        "version": "0.9.3.Final",
                        "connector": "mongodb",
                        "name": "yuwenyun",
                        "rs": "mgset-13045511",
                        "ns": "yuwenyun.estest",
                        "sec": 1553762695,
                        "ord": 1,
                        "h": 5437490261080768344,
                        "initsync": true
                    },
                    "op": "r",
                    "ts_ms": 1553762704918
                }
            },
            {
                "_index": "estest",
                "_type": "customer",
                "_id": "estest+0+3",
                "_score": 1,
                "_source": {
                    "after": null,
                    "patch": "{\"_id\" : {\"$oid\" : \"5c9c7b471bc46c008b81ec80\"},\"1\" : 1226447}",
                    "source": {
                        "version": "0.9.3.Final",
                        "connector": "mongodb",
                        "name": "yuwenyun",
                        "rs": "mgset-13045511",
                        "ns": "yuwenyun.estest",
                        "sec": 1553764732,
                        "ord": 1,
                        "h": -236075833439367192,
                        "initsync": false
                    },
                    "op": "u",
                    "ts_ms": 1553764741906
                }
            },
            {
                "_index": "estest",
                "_type": "customer",
                "_id": "estest+0+1",
                "_score": 1,
                "_source": {
                    "after": null,
                    "patch": "{\"_id\" : {\"$oid\" : \"5c9c7b471bc46c008b81ec80\"},\"1\" : 12264}",
                    "source": {
                        "version": "0.9.3.Final",
                        "connector": "mongodb",
                        "name": "yuwenyun",
                        "rs": "mgset-13045511",
                        "ns": "yuwenyun.estest",
                        "sec": 1553764557,
                        "ord": 1,
                        "h": -6372310413316992824,
                        "initsync": false
                    },
                    "op": "u",
                    "ts_ms": 1553764566566
                }
            },
            {
                "_index": "estest",
                "_type": "customer",
                "_id": "estest+0+2",
                "_score": 1,
                "_source": {
                    "after": null,
                    "patch": "{\"_id\" : {\"$oid\" : \"5c9c7b471bc46c008b81ec80\"},\"1\" : 122644}",
                    "source": {
                        "version": "0.9.3.Final",
                        "connector": "mongodb",
                        "name": "yuwenyun",
                        "rs": "mgset-13045511",
                        "ns": "yuwenyun.estest",
                        "sec": 1553764691,
                        "ord": 1,
                        "h": -250322674294173573,
                        "initsync": false
                    },
                    "op": "u",
                    "ts_ms": 1553764701044
                }
            }
        ]
    }
}
7 回复

解析 MongoDB 的 oplog。

可以用下这个mongo-es,实际项目中使用过。 或者读下它的源代码,你就能知道如何同步啦

我找了一个方案: MongoDB => Kafka => ElasticSearch; 到 es 的数据结构不行, 还得再想办法解决一下.

你可以试试 mongo-connector

mongo-connector +1

回到顶部