分治和hash-从海量数据大文件中查出某时间段内登陆超过阈值的ip地址 (二)
intln(entry.getKey());
for(Date date : entry.getValue()){
System.out.println(date);
}
}
return true;
}
/**
* 递归执行,将5分钟内访问超过阈值的ip找出来
*
* @param parent
* @return
* @throws IOException
*/
private void recurseFile(Path parent,Map> resMap) throws IOException{
//Path target = Paths.get(dir);
if(!Files.exists(parent) || !Files.isDirectory(parent)){
return;
}
Iterator targets = parent.iterator();
for(;targets.hasNext();){
Path path = targets.next();
if(Files.isDirectory(parent)){
//如果还是目录,递归
recurseFile(path.toAbsolutePath(),resMap);
}else {
//将一个文件中所有的行读上来
List lines = Files.readAllLines(path, Charset.forName("UTF-8"));
judgeAndcollection(lines,resMap);
}
}
}
/**
* 根据从较小文件读上来的每行ip accessTimes进行判断符合条件的ip
* 并放入resMap
*
* @param lines
* @param resMap
*/
private void judgeAndcollection(List lines,Map> resMap) {
if(lines != null){
//ip->ListaccessTimes
Map> judgeMap = new HashMap>();
for(String line : lines){
line = line.trim();
int space = line.indexOf(delimiter);
String ip = line.substring(0, space);
List accessTimes = judgeMap.get(ip);
if(accessTimes == null){
accessTimes = new ArrayList();
}
accessTimes.add(line.substring(space + 1).trim());
judgeMap.put(ip, accessTimes);
}
if(judgeMap.size() == 0){
return;
}
for(Map.Entry> entry : judgeMap.entrySet()){
List acessTimes = entry.getValue();
//相同ip,先判断整体大于10个
if(acessTimes != null && acessTimes.size() >= MAGIC){
//开始判断在List集合中,5分钟内访问超过MAGIC=10
List attackTimes = DuplicateUtils.attackList(acessTimes, 5 * 60 * 1000, MAGIC);
if(attackTimes != null){
resMap.put(entry.getKey(), attackTimes);
}
}
}
}
}
/**
* @param args
*/
public static void main(String[] args) {
String filename = "/DuplicateIP/log.txt";
DuplicateIP dip = new DuplicateIP(filename);
try {
dip.preProcess();
dip.process();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public class DuplicateIP {
private String delimiter = " ";
private String FILE_PRE = "ip_";
private int MAGIC = 10,BATCH_MAGIC = 500;
private String root = "/DuplicateIP/";
private String filename = "";
public DuplicateIP(final String filename) {
this.filename = filename;
}
/**
* 将大文件拆分成较小的文件,进行预处理
* @throws IOException
*/
private void preProcess() throws IOException {
//Path newfile = FileSystems.getDefault().getPath(filename);
BufferedInputStream fis = new BufferedInputStream(new FileInputStream(new File(filename)));
// 用5M的缓冲读取文本文件
BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),5*1024*1024);
//假设文件是10G,那么先根据hashcode拆成小文件,再进行读写判断
//如果不拆分文件,将ip地址当做key,访问时间当做value存到hashmap时,
//当来访的ip地址足够多的情况下,内存开销吃不消
// List entities = new ArrayList();
//存放ip的hashcode->accessTimes集合
Map> hashcodeMap = new HashMap>();
String line = "";
int count = 0;
while((line = reader.readLine()) != null){
String split[] = line.split(delimiter);
if(split != null && split.length >= 2){
//根据ip的hashcode这样拆分文件,拆分后的文件大小在1G上下波动
//极端情况是整个文件的ip地址全都相同,只有一个,那么拆分后还是只有一个文件
int serial = split[0].trim().hashCode() % MAGIC;
String splitFilename = FILE_PRE + serial;
List lines = hashcodeMap.get(splitFilename);
if(lines == null){
lines = new ArrayList();
hashc
| 评论 |
|
|