pos机系统日切请稍后重试,Java 零入侵记录系统运行日志

 新闻资讯  |   2023-04-23 11:30  |  投稿人:pos机之家

网上有很多关于pos机系统日切请稍后重试,Java 零入侵记录系统运行日志的知识,也有很多人为大家解答关于pos机系统日切请稍后重试的问题,今天pos机之家(www.poszjia.com)为大家整理了关于这方面的知识,让我们一起来看下吧!

本文目录一览:

1、pos机系统日切请稍后重试

pos机系统日切请稍后重试

在做项目时无意间想不通过切面形式并且支持独立部署来实现系统日志的采集,通过检索了解了Logstash、Filebeat、Flume、Fluentd、Logagent、rsyslog这些采集器,感觉太复杂了。通过借鉴开源项目,开发一个简版的采集器。

源代码

/** * 系统层文件监听 */public static void fileMonitor(){ /** 关注目录的事件 */ int mask = JNotify.FILE_CREATED | JNotify.FILE_DELETED | JNotify.FILE_MODIFIED| JNotify.FILE_RENAMED| JNotify.FILE_ANY; /** 是否监视子目录,即级联监视 */ boolean watchTree = true; MyJNotifyAdapter jNotifyListener = new MyJNotifyAdapter(); try { int watchID = JNotify.addWatch(monitorDir, mask, watchTree, jNotifyListener); System.out.println("监听程序Id:" + watchID); } catch (JNotifyException e) { e.printStackTrace(); }}

public class MyJNotifyAdapter extends JNotifyAdapter { private static Logger logger = LoggerFactory.getLogger(MyJNotifyAdapter.class); public static final ConcurrentSkipListMap<String,Long> CONTEXT_TIME = new ConcurrentSkipListMap<>(); @Override public void fileCreated(int wd, String rootPath, String name) { logger.info("创建文件 -> 当前线程:{},监听程序:{},监听路径:{},文件名称:{}",Thread.currentThread().getName(),wd,rootPath,name); } @Override public void fileDeleted(int wd, String rootPath, String name) { logger.info("删除文件 -> 当前线程:{},监听程序:{},监听路径:{},文件名称:{}",Thread.currentThread().getName(),wd,rootPath,name); } /** * 注意:JNotify存在一次文件修改,触发多次fileModified方法的BUG, * 该方法可以用来修复一次文件修改可能会触发多个fileModified方法, * 从而减少没有必要的资源重新加载。但是由于t变量是类内共享变量,所 * 以注意线程安全,尽量避免共用Listener导致错误 */ @Override public void fileModified(int wd, String rootPath, String name) { logger.info("修改文件 -> 当前线程:{},监听程序:{},监听路径:{},文件名称:{}",Thread.currentThread().getName(),wd,rootPath,name); String filePath = rootPath + File.separator + name; File file = new File(filePath); if(!file.isDirectory() && file.exists()){ long lastModified = file.lastModified(); //避免文件修改时重复读取数据 if(CONTEXT_TIME.get(String.valueOf(lastModified)) != null){ System.out.println("避免文件修改时重复读取数据"); }else{ CONTEXT_TIME.put(String.valueOf(lastModified),lastModified); Map<String,Object> hashMap = new HashMap<>(); hashMap.put("wd",wd); hashMap.put("filePath",filePath); hashMap.put("matchStr","start"); MyJNotifyAdapterPublisher.publishEvent(hashMap); } } } @Override public void fileRenamed(int wd, String rootPath, String oldName, String newName) { logger.info("修改文件名称 -> 当前线程:{},监听程序:{},监听路径:{},旧文件名称:{},新文件名称:{}",Thread.currentThread().getName(),wd,rootPath,oldName,newName); } static { GlobalThreadPool.scheduledThreadPool.scheduleWithFixedDelay(() -> { int size = CONTEXT_TIME.size(); if(size > 5){ String lastKey = CONTEXT_TIME.lastKey(); CONTEXT_TIME.keySet().removeIf(key -> !key.equals(lastKey)); logger.info("定时清理Map中的值:{}",CONTEXT_TIME); } },0, 60 * 60, TimeUnit.MILLISECONDS); }}

public class MyJNotifyAdapterPublisher { public static void publishEvent(Map<String,Object> map){ SpringContextUtils.publishEvent(new MyJNotifyAdapterEvent(map)); }}

/** * 日志采集器 */@Componentpublic class MyJNotifyAdapterListener { private static Logger log = LoggerFactory.getLogger(MyJNotifyAdapterListener.class); /** * @param event */ @Async(value = "taskExecutor") @EventListener(MyJNotifyAdapterEvent.class) public void fileModified(MyJNotifyAdapterEvent event){ Map<String, Object> map = (Map) event.getSource(); log.info("文件修改参数:{}",map); //1.读取文件 LogFileHandler.handler(map); }}

public class LogFileHandler { private static Logger log = LoggerFactory.getLogger(LogFileHandler.class); /** * 开始日志处理 * @param map */ public static void handler(Map<String,Object> map){ MyAbstractFilter myAbstractFilter = new MyAbstractFilter(); myAbstractFilter.addFilter(new ReadFileFilter()) .addFilter(new BranchFileFilter()) .addFilter(new AnalysisFileFilter()) .addFilter(PolymerizeFilter.getInstance()); myAbstractFilter.doFilter(map,myAbstractFilter); }}

/** * 读取日志文件 */public class ReadFileFilter implements MyBaseFilter { private static final Logger log = LoggerFactory.getLogger(ReadFileFilter.class); public static final Map<String,Long> CONTEXT_HOLDER = new ConcurrentSkipListMap<>(); @Override public void doFilter(Map<String, Object> map, MyBaseFilter myBaseFilter) { String str = readFile(map); if(StrUtil.isBlank(str)){ return; } map.put("input",str); myBaseFilter.doFilter(map,myBaseFilter); } /** * 读取指定大小的文件内容 * @param map -> filePath 文件路径 * -> size 读取指定大小的内容 * @return string 读取的内容 */ private String readFile(Map<String, Object> map){ StringBuilder stringBuilder = new StringBuilder(); String filePath = String.valueOf(map.get("filePath")); String key = MD5Util.getMD5_32(filePath.replaceAll("/","")); long pos = 0; if(CONTEXT_HOLDER.get(key) != null){ pos = CONTEXT_HOLDER.get(key); if(pos < 0){ pos = 0; } } BufferedRandomAccessFile bufferedRandomAccessFile = null; try { File file = new File(filePath); if(file.isDirectory()){ return null; } bufferedRandomAccessFile = new BufferedRandomAccessFile(filePath,"r"); long length = bufferedRandomAccessFile.length(); //避免文件清空后读取异常 if(length <= 0){ CONTEXT_HOLDER.put(key,0L); return null; } //每次自读1024 * 512个字节 int capacity = 1024 * 512; long readPointer = length - pos; //指定读取的起始位置 bufferedRandomAccessFile.seek(pos); int capacityLength = -1; if(readPointer <= capacity && readPointer > 0){ capacityLength = (int) readPointer; }else if(readPointer > capacity){ capacityLength = capacity; }else if(readPointer == 0){ capacityLength = (int) length; }else{ capacityLength = (int) - readPointer; } ByteBuffer byteBuffer = ByteBuffer.allocate(capacityLength); int readLength = -1; readLength = bufferedRandomAccessFile.read(byteBuffer.array()); byteBuffer.clear(); byte[] bytes = byteBuffer.array(); String str = new String(bytes, 0, readLength); stringBuilder.append(str); long pointer = bufferedRandomAccessFile.getFilePointer(); CONTEXT_HOLDER.put(key,pointer); //log.info("文件内容:{},容量是:{},指针位置:{}", stringBuilder, byteBuffer.capacity() ,pointer); return stringBuilder.toString(); } catch (IOException e) { e.printStackTrace(); return null; } finally { if(bufferedRandomAccessFile != null){ try { bufferedRandomAccessFile.close(); } catch (IOException e) { e.printStackTrace(); } } } }}

/** * 对读取到日志数据进行分行 */public class BranchFileFilter implements MyBaseFilter { @Override public void doFilter(Map<String, Object> map, MyBaseFilter myBaseFilter) { List<String> list = branch(map); if(list.size() == 0){ return; } map.put("list", list); myBaseFilter.doFilter(map,myBaseFilter); } /** * 分行 * 如果指定了行首正则表达式,则根据行首正则表达式对每次读取的日志进行分行,切分成多条日志; * 如果没有指定行首正则表达式,则将一行日志作为一条日志处理 */ private List<String> branch(Map<String,Object> map){ String input = String.valueOf(map.get("input")); String matchStr = String.valueOf(map.get("matchStr")); Map<Integer,Integer> hashMap = handlerPosition(input,matchStr); if(hashMap == null){ return null; } List<String> list = section(input,hashMap); return list; } /** * 计算出每一对开始位置与结束位置 * @param input 数据源 * @param matchStr 指定的字符串 * @return Map 开始位置与结束位置 */ private Map<Integer,Integer> handlerPosition(String input,String matchStr){ List<Integer> list = getListPosition(input,matchStr); if(list == null){ return null; } Map<Integer,Integer> hashMap = new HashMap<>(); int size = list.size(); if(size % 2 == 0){ for(int i = 0; i < size; i++){ if(i + 2 <= size){ hashMap.put(list.get(i),list.get(i+1)); } } } else{ for(int i = 0; i < size; i++){ if(i + 1 < size ){ hashMap.put(list.get(i),list.get(i+1)); } } } Map<Integer,Integer> result = new LinkedHashMap<>(); hashMap.entrySet().stream() .sorted(Map.Entry.comparingByKey()) .forEachOrdered(x -> result.put(x.getKey(), x.getValue())); return result; } /** * 获取指定的字符串出现的所有的位置 * @param input 数据源 * @param matchStr 指定的字符串 * @return */ private List<Integer> getListPosition(String input,String matchStr){ if(StrUtil.isNotBlank(input)){ List<Integer> list = new ArrayList<>(); int position = input.indexOf(matchStr); if(position == -1){ return null; } list.add(position); while (position != -1){ position = input.indexOf(matchStr, position + 1); list.add(position); } return list; } return null; } /** * 根据位置截取每段日志 * @param input * @param map * @return */ private List<String> section(String input,Map<Integer,Integer> map){ List<String> list = new ArrayList<>(); String section = ""; for(Map.Entry<Integer, Integer> entry : map.entrySet()){ if(entry.getValue() != -1){ section = input.substring(entry.getKey(),entry.getValue()); }else{ section = input.substring(entry.getKey()); } list.add(section); } return list; }}

/** * 对分行的日志数据进行解析 * 1、开启丢弃解析失败日志,则直接丢弃该日志,并上报解析失败的报错信息。 * 2、关闭丢弃解析失败日志,则上传解析失败的原始日志,其中Key为raw_log、Value为日志内容。 * 3、设置日志时间字段 * 3.1、如果未配置时间字段,则日志时间为当前解析日志的时间。 * 3.2、如果配置了时间字段: * 日志中记录的时间距离当前时间12小时以内,则从解析的日志字段中提取时间。 * 日志中记录的时间距离当前时间12小时以上,则丢弃该日志并上传错误信息。 */public class AnalysisFileFilter implements MyBaseFilter { Pattern pattern = Pattern.compile("(\\\\d{1,4}[-|\\\\/]\\\\d{1,2}[-|\\\\/]\\\\d{1,2} \\\\d{1,2}:\\\\d{1,2}:\\\\d{1,2})", Pattern.CASE_INSENSITIVE|Pattern.MULTILINE); @Override public void doFilter(Map<String, Object> map, MyBaseFilter myBaseFilter) { List<Map<String, Object>> list = analysis(map); if(list.size() == 0){ return; } map.put("list",list); myBaseFilter.doFilter(map,myBaseFilter); } /** * 解析 * 根据配置的采集模式,对每条日志内容进行解析 * 开启丢弃解析失败日志,则直接丢弃该日志,并上报解析失败的报错信息 * 关闭丢弃解析失败日志,则上传解析失败的原始日志,其中Key为raw_log、Value为日志内容 */ private List<Map<String, Object>> analysis(Map<String, Object> map){ List<Map<String, Object>> arrayList = new ArrayList<>(); Map<String, Object> hashMap; List<String> list = (List<String>) map.get("list"); Matcher matcher; String time = DateUtil.current(); for(String str : list){ hashMap = new HashMap<>(); matcher = pattern.matcher(str); if(matcher.find() && matcher.groupCount() >= 1){ time = matcher.group(0); } hashMap.put("value", str); hashMap.put("time", time); arrayList.add(hashMap); } return arrayList; }}

/** * 聚合日志 */public class PolymerizeFilter implements MyBaseFilter{ List<Map<String,Object>> list = new ArrayList<>(); private static class SingletonClassInstance{ private static final PolymerizeFilter instance= new PolymerizeFilter(); } private PolymerizeFilter(){} public static PolymerizeFilter getInstance(){ return SingletonClassInstance.instance; } @Override public void doFilter(Map<String, Object> map, MyBaseFilter myBaseFilter) { List<Map<String, Object>> arrayList = (List<Map<String, Object>>) map.get("list"); this.list.addAll(arrayList); polymerize(list); return; } /** * 聚合日志: * 为降低网络请求次数,在日志处理、过滤完毕后,会在系统内部缓存一段时间后进行聚合打包,再发送到日志服务。 * 缓存数据后,如果满足以下条件之一,则即时打包日志发送到日志服务。 * 日志聚合时间超过3秒。 * 日志聚合条数超过4096条。 * 日志聚合总大小超过512KB。 */ public void polymerize(List<Map<String,Object>> list){ if(list.size() >= 4096){ sendLog(list); } if(getByteSize(list) / 1024 >= 512){ sendLog(list); } if(list.size() > 0){ sendLog(list); } } /** * 发送日志: * 将采集到的日志聚合并发送到日志服务。 * 设置启动参数配置文件中的max_bytes_per_sec参数和send_request_concurrency参数可调整日志数据的发送速度和最大并发数 * 如果数据发送失败,Logtail自动根据错误信息决定重试或放弃发送。 */ private void sendLog(List<Map<String,Object>> list){ System.out.println("发送日志--->list"+list); Map<String,Object> hashMap = new HashMap<>(); hashMap.put("list",list); LogPublisher.publishEvent(hashMap); //this.list.clear(); } /** * 得到数据字节大小 * @param list * @return */ private long getByteSize(List<?> list){ long byteSize; ByteArrayOutputStream baos = null; ObjectOutputStream os = null; try { baos = new ByteArrayOutputStream(); os = new ObjectOutputStream(baos); os.writeObject(list); byteSize = baos.size(); return byteSize; } catch (IOException e) { e.printStackTrace(); } finally { if(os != null){ try { os.close(); } catch (IOException e) { e.printStackTrace(); } } if(baos != null){ try { baos.close(); } catch (IOException e) { e.printStackTrace(); } } } return 0; } public List<Map<String, Object>> getList() { return list; } public void clareList(){ this.list.clear(); }}

/** * 日志推送 */@Componentpublic class LogListener { private static Logger log = LoggerFactory.getLogger(LogListener.class); /** * 需要使用队列接收传入的数据,日志实时产生,处理日志则需要时间 * @param event */ @Async(value = "taskExecutor") @EventListener(LogEvent.class) public void fileModified(LogEvent event){ Map<String,Object> list = (Map<String,Object>) event.getSource(); log.info("保存采集的日志数据:{}",list); //保存采集到的日志到数据库 // PolymerizeFilter.getInstance().clareList(); }}

以上就是关于pos机系统日切请稍后重试,Java 零入侵记录系统运行日志的知识,后面我们会继续为大家整理关于pos机系统日切请稍后重试的知识,希望能够帮助到大家!

转发请带上网址:http://www.poszjia.com/news/25692.html

你可能会喜欢:

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 babsan@163.com 举报,一经查实,本站将立刻删除。