mahout算法源码分析之Itembased Collaborative Filtering(一)PreparePreferenceMatrixJob(二)

2014-11-24 09:04:20 · 作者: · 浏览: 1
ows IOException, InterruptedException {
long minimumItemID = Long.MAX_VALUE;
for (VarLongWritable varLongWritable : possibleItemIDs) {
long itemID = varLongWritable.get();
if (itemID < minimumItemID) {
minimumItemID = itemID;
}
}
if (minimumItemID != Long.MAX_VALUE) {
context.write(index, new VarLongWritable(minimumItemID));
}
}
总感觉这里没啥必要,reducer返回的还是101-->101,或者这里应该有什么说法的?
输出文件是ITEMID_INDEX,输出格式 : VarintWritable-->VarLongWritable
所以这个job就分析完了。
(2)//convert user preferences into a vector per user
[java]
Job toUserVectors = prepareJob(getInputPath(), getOutputPath(USER_VECTORS), TextInputFormat.class,
ToItemPrefsMapper.class, VarLongWritable.class, booleanData VarLongWritable.class : EntityPrefWritable.class,
ToUserVectorsReducer.class, VarLongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class);
输入格式:userid,itemid,value
看mapper:(ToItemPrefsMapper继承ToEntityPrefsMapper,而ToItemPrefsMapper是空的,所以看ToEntityPrefsMapper)
[java]
public void map(LongWritable key,
Text value,
Context context) throws IOException, InterruptedException {
String[] tokens = DELIMITER.split(value.toString());
long userID = Long.parseLong(tokens[0]);
long itemID = Long.parseLong(tokens[1]);
if (itemKey ^ transpose) {
// If using items as keys, and not transposing items and users, then users are items!
// Or if not using items as keys (users are, as usual), but transposing items and users,
// then users are items! Confused
long temp = userID;
userID = itemID;
itemID = temp;
}
if (booleanData) {
context.write(new VarLongWritable(userID), new VarLongWritable(itemID));
} else {
float prefValue = tokens.length > 2 Float.parseFloat(tokens[2]) + ratingShift : 1.0f;
context.write(new VarLongWritable(userID), new EntityPrefWritable(itemID, prefValue));
}
}
这么些代码,最主要的就是最后两句了,一句是求评分值,但是这里的加上ratingShift不知道是干啥的?虽然ratingShift是0.0。最后输出就是userID-->[itemID,prefValue]
再看reducer:
[java]
protected void reduce(VarLongWritable userID,
Iterable itemPrefs,
Context context) throws IOException, InterruptedException {
Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
for (VarLongWritable itemPref : itemPrefs) {
int index = TasteHadoopUtils.idToIndex(itemPref.get());
float value = itemPref instanceof EntityPrefWritable ((EntityPrefWritable) itemPref).getPrefValue() : 1.0f;
userVector.set(index, value);
}
if (userVector.getNumNondefaultElements() >= minPreferences) {
VectorWritable vw = new VectorWritable(userVector);
vw.setWritesLaxPrecision(true);
context.getCounter(Counters.USERS).increment(1);
context.write(userID, vw);
}
}
首先说下为啥mapper输出的value是EntityPrefWritable,但是这里的Iterable接收的时候使用的是VarLongWritable,因为前者继承后者。然后就是用户所有的评分都写入一个vecotr,使用itemid作为vector的下标,prefValue作为值;最后判断一下,如果vector含有的item个数大于或等于minPreference(这里看出这个参数的意义了吧)就输出,否则不输出。另外,就是设置了一个Counters.USERS计数器,用来统计用户的个数。
这个job的输出为:USER_VECTORS,格式为: : userid-->vect