`
javasee
  • 浏览: 924785 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

springside3.*中log4j和java.util.concurrent的结合使用

阅读更多

 在springside3.*中的showcase案例中,有一个把log4j的日志存入数据库的演示,下面是我对这个案例的学习笔记。
1、我们首先来看下log4j相关日志的配置:

#Async Database Appender (Store business message)
log4j.appender.DB
= org.springside.examples.showcase.log.appender.QueueAppender
log4j.appender.DB.QueueName
= dblog

#Demo level with Async Database appender 
log4j.logger.DBLogExample
= INFO,Console,DB
log4j.additivity.DBLogExample
= false


其中org.springside.examples.showcase.log.appender.QueueAppender就是对ssLog4j日 志的一个扩展,而日志的event(里面是日志的内容)是存放在一个BlockingQueue中,当有多个日志需要分别存入不同的地方时,就根据 QueryName来区分。
2、接下来看一下org.springside.examples.showcase.log.appender.QueueAppender里面的内容:

/** */ /**
 * Copyright (c) 2005-2009 springside.org.cn
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * 
 * $Id: QueueAppender.java 1189 2010-09-01 17:24:12Z calvinxiu $
 
*/

package  org.springside.examples.showcase.log.appender;

import  java.util.concurrent.BlockingQueue;

import  org.apache.log4j.helpers.LogLog;
import  org.apache.log4j.spi.LoggingEvent;
import  org.springside.examples.showcase.queue.QueuesHolder;

/** */ /**
 * 轻量级的Log4j异步Appender.
 * 
 * 将所有消息放入QueueManager所管理的Blocking Queue中.
 * 
 * 
@see  QueuesHolder
 * 
 * 
@author  calvin
 
*/

public   class  QueueAppender  extends  org.apache.log4j.AppenderSkeleton  {

    
protected  String queueName;

    
protected  BlockingQueue < LoggingEvent >  queue;

    
/** */ /**
     * AppenderSkeleton回调函数, 事件到达时将时间放入Queue.
     
*/

    @Override
    
public   void  append(LoggingEvent event)  {
        
if  (queue  ==   null {
            queue 
=  QueuesHolder.getQueue(queueName);
        }


        
boolean  sucess  =  queue.offer(event);

        
if  (sucess)  {
            LogLog.debug(
" put event to queue success: "   +   new  LoggingEventWrapper(event).convertToString());

        }
  else   {
            LogLog.error(
" Put event to queue fail: "   +   new  LoggingEventWrapper(event).convertToString());
        }

    }


    
/** */ /**
     * AppenderSkeleton回调函数,关闭Logger时的清理动作.
     
*/

    
public   void  close()  {
    }


    
/** */ /**
     * AppenderSkeleton回调函数, 设置是否需要定义Layout.
     
*/

    
public   boolean  requiresLayout()  {
        
return   false ;
    }


    
/** */ /**
     * Log4j根据getter/setter从log4j.properties中注入同名参数.
     
*/

    
public  String getQueueName()  {
        
return  queueName;
    }


    
/** */ /**
     * 
@see  #getQueueName()
     
*/

    
public   void  setQueueName(String queueName)  {
        
this .queueName  =  queueName;
    }

}

这是对Log4j扩展的标准做法,继承abstract class AppenderSkeleton,实现它的abstract  protected   void append(LoggingEvent event) 方法。
而这个方法的实现很简单,就是根据queueName从queuesHolder中取出一个 BlockingQueue<LoggingEvent>,然后把LoggerEvent塞到这个BlockingQueue中去,关于 queuesHolder,下面会讲到。到这为止,log4j的活就完成了,下面的都是concurrent的事了。
3、让我们转到spring的配置文件中,看看springside是如何接收下面的工作,下面是applicationContext-log.xml的片段:

     <!--  消息Queue管理器 -->
    
< bean  class = " org.springside.examples.showcase.queue.QueuesHolder " >
        
< property name = " queueSize "  value = " 1000 "   />
    
</ bean >

    
<!--  读出Queue中日志消息写入数据库的任务  -->
    
< bean id = " jdbcLogWriter "   class = " org.springside.examples.showcase.log.appender.JdbcLogWriter " >
        
< property name = " queueName "  value = " dblog "   />
        
< property name = " batchSize "  value = " 10 "   />
        
< property name = " sql " >
            
< value >
                insert into SS_LOG(THREAD_NAME,LOGGER_NAME,LOG_TIME,LEVEL,MESSAGE)
                values(:thread_name,:logger_name,:log_time,:level,:message)
            
</ value >
        
</ property >
    
</ bean >


我们先从简单的下手,先看QueuesHolder:

private   static  ConcurrentMap < String, BlockingQueue >  queueMap  =   new  MapMaker().concurrencyLevel( 32 ).makeMap(); // 消息队列

/** */ /**
     * 根据queueName获得消息队列的静态函数.
     * 如消息队列还不存在, 会自动进行创建.
     
*/

    
public   static   < T >  BlockingQueue < T >  getQueue(String queueName)  {
        BlockingQueue queue 
=  queueMap.get(queueName);

        
if  (queue  ==   null {
            BlockingQueue newQueue 
=   new  LinkedBlockingQueue(queueSize);

            
// 如果之前消息队列还不存在,放入新队列并返回Null.否则返回之前的值.
            queue  =  queueMap.putIfAbsent(queueName, newQueue);
            
if  (queue  ==   null {
                queue 
=  newQueue;
            }

        }

        
return  queue;
    }

其实这个类很简单,就是一个map,key就是上面log4j配置文件中的queueName,value就是一个BlockingQueue,这样就可以存放多个日志queue,做不同的处理。
4、下面这个是重头戏,先把JdbcLogWriter的代码全贴出来:

/** */ /**
 * Copyright (c) 2005-2009 springside.org.cn
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * 
 * $Id: JdbcAppenderTask.java 353 2009-08-22 09:33:28Z calvinxiu
 
*/

package  org.springside.examples.showcase.log.appender;

import  java.util.HashMap;
import  java.util.List;
import  java.util.Map;

import  javax.annotation.Resource;
import  javax.sql.DataSource;

import  org.apache.log4j.spi.LoggingEvent;
import  org.springframework.dao.DataAccessException;
import  org.springframework.dao.DataAccessResourceFailureException;
import  org.springframework.jdbc.core.namedparam.SqlParameterSource;
import  org.springframework.jdbc.core.namedparam.SqlParameterSourceUtils;
import  org.springframework.jdbc.core.simple.SimpleJdbcTemplate;
import  org.springframework.transaction.PlatformTransactionManager;
import  org.springframework.transaction.TransactionStatus;
import  org.springframework.transaction.support.TransactionCallbackWithoutResult;
import  org.springframework.transaction.support.TransactionTemplate;
import  org.springside.examples.showcase.queue.BlockingConsumer;

import  com.google.common.collect.Lists;
import  com.google.common.collect.Maps;

/** */ /**
 * 将Queue中的log4j event写入数据库的消费者任务.
 * 
 * 即时阻塞的读取Queue中的事件,达到缓存上限后使用Jdbc批量写入模式.
 * 如需换为定时读取模式,继承于PeriodConsumer稍加改造即可.
 * 
 * 
@see  BlockingConsumer
 * 
 * 
@author  calvin
 
*/

public   class  JdbcLogWriter  extends  BlockingConsumer  {

    
protected  String sql;
    
protected   int  batchSize  =   10 ;

    
protected  List < LoggingEvent >  eventsBuffer  =  Lists.newArrayList();
    
protected  SimpleJdbcTemplate jdbcTemplate;
    
protected  TransactionTemplate transactionTemplate;

    
/** */ /**
     * 带Named Parameter的insert sql.
     * 
     * Named Parameter的名称见AppenderUtils中的常量定义.
     
*/

    
public   void  setSql(String sql)  {
        
this .sql  =  sql;
    }


    
/** */ /**
     * 批量读取事件数量, 默认为10.
     
*/

    
public   void  setBatchSize( int  batchSize)  {
        
this .batchSize  =  batchSize;
    }


    
/** */ /**
     * 根据注入的DataSource创建jdbcTemplate.
     
*/

    @Resource
    
public   void  setDataSource(DataSource dataSource)  {
        jdbcTemplate 
=   new  SimpleJdbcTemplate(dataSource);
    }


    
/** */ /**
     * 根据注入的PlatformTransactionManager创建transactionTemplate.
     
*/

    @Resource
    
public   void  setDefaultTransactionManager(PlatformTransactionManager defaultTransactionManager)  {
        transactionTemplate 
=   new  TransactionTemplate(defaultTransactionManager);
    }


    
/** */ /**
     * 消息处理函数,将消息放入buffer,当buffer达到batchSize时执行批量更新函数.
     
*/

    @Override
    
protected   void  processMessage(Object message)  {
        LoggingEvent event 
=  (LoggingEvent) message;
        eventsBuffer.add(event);

        
if  (logger.isDebugEnabled())  {
            logger.debug(
" get event: {} " new  LoggingEventWrapper(event).convertToString());
        }


        
// 已到达BufferSize则执行批量插入操作
         if  (eventsBuffer.size()  >= <sp
1
3
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics