代码
> var t = new ScopedThread(mapred, 963598, 981805)
> t.start()
> t.join()
现在我们可以放入一些JS代码,这些代码可以产生4个线程,下面来等待结果显示:
代码
> var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 })
> var keys = res.splitKeys
> keys.length
39
> var mapred = function(min, max) {
return db.runCommand({ mapreduce: "uniques",
map: function () { emit(this.dim0, 1); },
reduce: function (key, values) { return Array.sum(values); },
out: "mrout" + min,
sort: {dim0: 1},
query: { dim0: { $gte: min, $lt: max } } }) }
> var numThreads = 4
> var inc = Math.floor(keys.length / numThreads) + 1
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max:{ "$maxKey" : 1 }
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
{
"result" : "mrout0",
"timeMillis" : 205790,
"counts" : {
"input" : 2750002,
"emit" : 2750002,
"reduce" : 274828,
"output" : 274723
},
"ok" : 1
}
{
"result" : "mrout274736",
"timeMillis" : 189868,
"counts" : {
"input" : 2500013,
"emit" : 2500013,
"reduce" : 250364,
"output" : 250255
},
"ok" : 1
}
{
"result" : "mrout524997",
"timeMillis" : 191449,
"counts" : {
"input" : 2500014,
"emit" : 2500014,
"reduce" : 250120,
"output" : 250019
},
"ok" : 1
}
{
"result" : "mrout775025",
"timeMillis" : 184945,
"counts" : {
"input" : 2249971,
"emit" : 2249971,
"reduce" : 225057,
"output" : 224964
},
"ok" : 1
}
第1个线程所做的工作比其他的要多一点,但时间仍达到了190秒,这意味着多线程并没有比单线程快!
使用多个数据库
这里的问题是,线程之间存在太多锁争用。当锁时,MR不是非常无私(每1000次读取会进行yield)。由于MR任务做了大量写操作,线程之间结束时会等待彼此。由于MongoDB的每个数据库都有独立的锁,那么让我们来尝试为每个线程使用不同的输出数据库:
代码
> var mapred = function(min, max) {
return db.runCommand({ mapreduce: "uniques",
map: function () { emit(this.dim0, 1); },
reduce: function (key, values) { return Array.sum(values); },
out: { replace: "mrout" + min, db: "mrdb" + min },
sort: {dim0: 1},
query: { dim0: { $gte: min, $lt: max } } }) }
> threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
min:0 max:274736
min:274736 max:524997
min:524997 max:775025
min:775025 max:{ "$maxKey" : 1 }
connecting to: test
connecting to: test
connecting to: test
connecting to: test
> for (var i in threads) { var t = threads[i]; t.join(); printjson(t.returnData()); }
...
{
"result" : {
"db" : "mrdb274736",
"collection" : "mrout274736"
},
"timeMillis" : 105821,
"counts" : {
"input" : 2500013,
"emit" : 2500013,