Fork me on GitHub

FastDFS-Client使用说明

Published on:
Tags: Fastdfs

项目地址

fastdfs-client

   

fastdfs-client是什么

fastdfs-client是一个访问fastdfs的Java客户端框架,帮助开发人员快速使用分布式文件系统的工具,封装了TrackerClient操作来管理存储节点,封装了StorageClient操作来执行文件上传下载功能。

change log

V1.1.0

  1. 修改download文件receive时带入的inputStream对象,inputStream对象修改为克隆socket的inputstream,避免污染连接池中的socket对象,当业务回调不读取留时会影响下一次连接池中获取的socket对象。
  2. 在使用1.0.0版本进行download文件时,建议使用DownloadCallback的实现类:DownloadByteArray和DownloadFileWriter不要自己去实现,不要关闭receive方法传入的inputStream对象。
  3. 在使用1.1.0版本进行download文件时,receive传入的inputStream是克隆的,因此使用完后必须进行关闭操作。

V1.0.0

  1. 包装Request和Response报文解析
  2. 包装Storage和Tracker操作命令
  3. 增加连接池提升使用性能

接口方法

StorageClient

1.png

TrackerClient

2.png

Maven引入

<dependency>
  <groupId>cn.tsoft.framework</groupId>
  <artifactId>fastdfs-client</artifactId>
  <version>1.0.0-SNAPSHOT</version>
</dependency>

Spring引入

<import resource="classpath:spring-fastdfs.xml"/>

Client使用

@Autowired
private StorageClient storageClient;
 
//上传
String path = ClassLoader.getSystemResource("123456.txt").getPath();
File file = new File(path);
FileInputStream fileInputStream = FileUtils.openInputStream(file);
//方式1
StorePath storePath = storageClient.uploadFile("group1", fileInputStream, file.length(), "txt");
//方式2
StorePath storePath = storageClient.uploadFile(fileInputStream, file.length(), "txt");
//方式3
StorePath storePath = storageClient.uploadFile(path);
//方式4
StorePath storePath = storageClient.uploadFile(path, "txt");
//方式5
StorePath storePath = storageClient.uploadFile("group1", path, "txt");
//方式6
StorePath storePath = storageClient.uploadFile(fileInputStream, file.length(), "txt", metaDataSet);
//上传文件并增加元数据
Set<MateData> metaDataSet = new HashSet<MateData>();
MateData mateData = new MateData("mateDataName","mateDataValue");
metaDataSet.add(mateData);
StorePath storePath = storageClient.uploadFile("group1", fileInputStream, file.length(), "txt", metaDataSet);
 
//上传从文件,一个主文件可以挂多个从文件
//方式1
String masterFileId = storePath.getFullPath();
String[] parts = new String[2];
splitFileId(masterFileId, parts);
storePath = storageClient.uploadSlaveFile(parts[0], parts[1], fileInputStream, file.length(), "-1", "txt");
//方式2
storePath = storageClient.uploadSlaveFile(masterFileId, fileInputStream, file.length(), "-1", "xlsx");
         
fileInputStream.close();
 
 
//下载
//方式1
String path = ClassLoader.getSystemResource("123456.txt").getPath();
StorePath storePath = storageClient.uploadFile("group1", path, "txt");
addResultFileId(storePath.getFullPath());
DownloadFileWriter downloadFileWriter = new DownloadFileWriter(path.replaceAll("123456.txt", "123456downlaod1.txt"));
String filePath = storageClient.downloadFile(storePath.getGroup(), storePath.getPath(), downloadFileWriter);
//方式2
DownloadFileWriter downloadFileWriter = new DownloadFileWriter(path.replaceAll("123456.txt", "123456downlaod2.txt"));
String filePath = storageClient.downloadFile(storePath.getFullPath(), downloadFileWriter);
//方式3
DownloadFileWriter downloadFileWriter = new DownloadFileWriter(path.replaceAll("123456.txt", "123456downlaod3.txt"));
String filePath = storageClient.downloadFile(storePath.getGroup(), storePath.getPath(), 10, 0, downloadFileWriter); 
//方式4
DownloadFileWriter downloadFileWriter = new DownloadFileWriter(path.replaceAll("123456.txt", "123456downlaod4.txt"));
String filePath = storageClient.downloadFile(storePath.getFullPath(), 5, 0, downloadFileWriter);
 
//删除
//方式1
String path = ClassLoader.getSystemResource("123456.txt").getPath();
StorePath storePath = storageClient.uploadFile("group1", path, "txt");
boolean flag = storageClient.deleteFile(storePath.getGroup(), storePath.getPath());
//方式2
boolean flag = storageClient.deleteFile(storePath.getFullPath());
 
//获取文件信息
//方式1
String path = ClassLoader.getSystemResource("123456.txt").getPath();
StorePath storePath = storageClient.uploadFile("group1", path, "txt");
addResultFileId(storePath.getFullPath());
String fileId = storePath.getFullPath();
FileInfo fileInfo = storageClient.getFileInfo(storePath.getGroup(), storePath.getPath());
//方式2
FileInfo fileInfo = storageClient.getFileInfo(fileId);
 
//获取文件元数据
//方式1
String masterFileId = storePath.getFullPath();
String[] parts = new String[2];
splitFileId(masterFileId, parts);
Set<MateData> mateDataSet = storageClient.getMetadata(parts[0], parts[1]);
//方式2
Set<MateData> mateDataSet = storageClient.getMetadata(masterFileId);
 
//覆盖文件元数据
//方式1
String[] parts = new String[2];
splitFileId(masterFileId, parts);
mateDataSet = new HashSet<MateData>();
mateDataSet.add(new MateData("key5", "value5"));
mateDataSet.add(new MateData("key6", "value6"));
mateDataSet.add(new MateData("key7", "value7"));
boolean flag = storageClient.overwriteMetadata(parts[0], parts[1], mateDataSet);
//方式2
boolean flag = storageClient.overwriteMetadata(masterFileId, mateDataSet);
 
//合并文件元数据
//方式1
String[] parts = new String[2];
splitFileId(masterFileId, parts);
mateDataSet = new HashSet<MateData>();
mateDataSet.add(new MateData("key5", "value5"));
mateDataSet.add(new MateData("key6", "value6"));
mateDataSet.add(new MateData("key7", "value7"));
boolean flag = storageClient.mergeMetadata(parts[0], parts[1], mateDataSet);
//方式2
boolean flag = storageClient.mergeMetadata(masterFileId, mateDataSet);
 
//一下方法就不具体介绍
//续传文件
appendFile
//修改续传文件
modifyFile
//清除续传文件
truncateFile

ps.TrackerClient的操作是配合StorageClient使用,我们在正常业务使用中一般不会用到它。

FastDFS-nginx-module使用

上传的文件可以通过nginx直接访问

例如:我们上传的文件获取的文件id:group1/M00/02/92/wKgAMFkekciAC8fhAAJjfD2dq-w10.xlsx

nginx访问路径:http://192.168.0.48:8079/group1/M00/02/92/wKgAMFkekciAC8fhAAJjfD2dq-w10.xlsx

目前nginx模块跟storage存储节点匹配,nginx会通过fastdfs-plugin跟tracker通信将文件的信息路由到不同的storage上去

注意事项

  1. 上传文件后记录fileId,fastdfs不会自动删除文件,所以业务需要进行定期删除无用的文件,避免硬盘消耗过大
  2. rpc之间调用时
    • 以前是rpc client端通过文件byte方式传入rpc server端,这样rpc的请求包过大会导致rpc调用性能急速下降
    • 应修改为通过fastdfs做桥接,rpc client端upload文件到fastdfs,将返回的fileId做参数传入rpc server端 ,rpc server端通过fileid去fastdfs服务器上download文件文件

FastDFS-client-api说明

/**
 * 存储服务(Storage)客户端接口
 * 
 * @author ningyu
 * @date 2017年5月18日 上午11:25:03
 *
 */
public interface StorageClient {
    /**
     * 上传文件
     *
     * @param groupName   组名称
     * @param inputStream 文件输入流
     * @param fileSize    文件大小
     * @param fileExtName 文件扩展名
     * @return 文件存储路径
     */
    StorePath uploadFile(String groupName, InputStream inputStream, long fileSize, String fileExtName);
     
    /**
     * 文件上传
     *
     * @param inputStream 文件输入流
     * @param fileSize    文件大小
     * @param fileExtName 文件扩展名
     * @return
     */
    StorePath uploadFile(InputStream inputStream, long fileSize, String fileExtName);
     
     
    /**
     * 文件上传
     *
     * @param localFilePath 文件完全路径
     * @return
     */
    StorePath uploadFile(String localFilePath);
     
    /**
     * 文件上传
     *
     * @param localFilePath 文件完全路径
     * @param fileExtName   文件后缀名
     * @return
     */
    StorePath uploadFile(String localFilePath, String fileExtName);
     
    /**
     * 文件上传
     *
     * @param groupName     组名称
     * @param localFilePath 文件完全路径
     * @param fileExtName   文件后缀名
     * @return
     */
    StorePath uploadFile(String groupName, String localFilePath, String fileExtName);
 
    /**
     * 上传从文件
     *
     * @param groupName      组名称
     * @param masterFilename 主文件路径(fastdfs返回的file_id 去掉前面的group)
     * @param inputStream    从文件输入流
     * @param fileSize       从文件大小
     * @param prefixName     从文件前缀
     * @param fileExtName    主文件扩展名
     * @return 文件存储路径
     */
    StorePath uploadSlaveFile(String groupName, String masterFilename, InputStream inputStream, long fileSize, String prefixName, String fileExtName);
    
    /**
     * 上传从文件
     *
     * @param masterFileId 主文件路径(fastdfs返回的file_id,包含前面的group)
     * @param inputStream  从文件输入流
     * @param fileSize     从文件大小
     * @param prefixName   从文件前缀
     * @param fileExtName  主文件扩展名
     * @return
     */
    StorePath uploadSlaveFile(String masterFileId, InputStream inputStream, long fileSize, String prefixName, String fileExtName);
 
    /**
     * 获取文件元信息
     *
     * @param groupName 组名称
     * @param path      主文件路径
     * @return 获取文件元信息集合,不存在返回空集合
     */
    Set<MateData> getMetadata(String groupName, String path);
     
    /**
     * 获取文件元信息
     *
     * @param fileId 文件路径(fastdfs返回的file_id,包含前面的group)
     * @return
     */
    Set<MateData> getMetadata(String fileId);
 
    /**
     * 修改文件元信息(覆盖)
     *
     * @param groupName   组名称
     * @param path        主文件路径
     * @param metaDataSet 元信息集合
     */
    boolean overwriteMetadata(String groupName, String path, Set<MateData> metaDataSet);
     
    /**
     * 修改文件元信息(覆盖)
     *
     * @param fileId        文件路径(fastdfs返回的file_id,包含前面的group)
     * @param metaDataSet   元信息集合
     * @return
     */
    boolean overwriteMetadata(String fileId, Set<MateData> metaDataSet);
 
    /**
     * 修改文件元信息(合并)
     *
     * @param groupName   组名称
     * @param path        主文件路径
     * @param metaDataSet 元信息集合
     */
    boolean mergeMetadata(String groupName, String path, Set<MateData> metaDataSet);
     
    /**
     * 修改文件元信息(合并)
     *
     * @param fileId         文件路径(fastdfs返回的file_id,包含前面的group)
     * @param metaDataSet    元信息集合
     * @return
     */
    boolean mergeMetadata(String fileId, Set<MateData> metaDataSet);
 
    /**
     * 获取文件的信息
     *
     * @param groupName 组名称
     * @param path      主文件路径
     * @return 文件信息(不存在返回null)
     */
    FileInfo getFileInfo(String groupName, String path);
     
    /**
     * 获取文件信息
     *
     * @param fileId  文件路径(fastdfs返回的file_id,包含前面的group)
     * @return
     */
    FileInfo getFileInfo(String fileId);
 
    /**
     * 删除文件
     *
     * @param groupName 组名称
     * @param path      主文件路径
     */
    boolean deleteFile(String groupName, String path);
     
    /**
     * 删除文件
     *
     * @param fileId 文件路径(fastdfs返回的file_id,包含前面的group)
     * @return
     */
    boolean deleteFile(String fileId);
 
    /**
     * 下载整个文件
     *
     * @param groupName 组名称
     * @param path      主文件路径
     * @param callback  下载回调接口
     * @return 下载回调接口返回结果
     */
    <T> T downloadFile(String groupName, String path, DownloadCallback<T> callback);
     
    /**
     * 下载整个文件
     *
     * @param fileId   文件路径(fastdfs返回的file_id,包含前面的group)
     * @param callback 下载回调接口
     * @return
     */
    <T> T downloadFile(String fileId, DownloadCallback<T> callback);
 
    /**
     * 下载文件片段
     *
     * @param groupName  组名称
     * @param path       主文件路径
     * @param fileOffset 开始位置
     * @param fileSize   文件大小(经过测试好像这个参数值只能是“0”)
     * @param callback   下载回调接口
     * @return 下载回调接口返回结果
     */
    <T> T downloadFile(String groupName, String path, long fileOffset, long fileSize, DownloadCallback<T> callback);
     
    /**
     * 下载文件片段
     *
     * @param fileId     文件路径(fastdfs返回的file_id,包含前面的group)
     * @param fileOffset 开始位置
     * @param fileSize   文件大小(经过测试好像这个参数值只能是“0”)
     * @param callback   下载回调接口
     * @return
     */
    <T> T downloadFile(String fileId, long fileOffset, long fileSize, DownloadCallback<T> callback);
 
    // ----------------------------------------------------------------------------------------------------------------------------------------------------
 
    /**
     * 上传文件, 并设置文件元数据
     *
     * @param inputStream 文件输入流
     * @param fileSize    文件大小
     * @param fileExtName 文件扩展名
     * @param metaDataSet 元信息集合
     * @return 文件存储路径
     */
    StorePath uploadFile(InputStream inputStream, long fileSize, String fileExtName, Set<MateData> metaDataSet);
     
    /**
     * 上传文件, 并设置文件元数据
     *
     * @param groupName    组名
     * @param inputStream  文件输入流
     * @param fileSize     文件大小
     * @param fileExtName  文件扩展名
     * @param metaDataSet  元信息集合
     * @return
     */
    StorePath uploadFile(String groupName, InputStream inputStream, long fileSize, String fileExtName, Set<MateData> metaDataSet);
 
    /**
     * 文件上传(支持续传追加内容)
     *
     * @param groupName   组名称
     * @param inputStream 文件输入流(文件部分)
     * @param fileSize    文件大小
     * @param fileExtName 文件扩展名
     * @return 文件存储路径
     */
    StorePath uploadAppenderFile(String groupName, InputStream inputStream, long fileSize, String fileExtName);
 
    /**
     * 续传文件(追加内容)</br>
     * 从末尾追加内容</br>
     *
     * @param groupName   组名称
     * @param path        文件路径
     * @param inputStream 文件输入流(文件部分)
     * @param fileSize    文件大小
     * 
     */
    void appendFile(String groupName, String path, InputStream inputStream, long fileSize);
     
    /**
     * 续传文件(追加内容)</br>
     * 从末尾追加内容</br>
     *
     * @param fileId       文件路径(fastdfs返回的file_id,包含前面的group)
     * @param inputStream  文件输入流(文件部分)
     * @param fileSize     文件大小
     * 
     */
    void appendFile(String fileId, InputStream inputStream, long fileSize);
 
    /**
     * 修改文件内容的内容</br>
     * 从offset开始覆盖fileSize长度</br>
     * 报22参数错误,请检查offset是否超过文件长度</br>
     *
     * @param groupName   组名称
     * @param path        文件路径
     * @param inputStream 文件输入流
     * @param fileSize    文件大小
     * @param fileOffset  开始位置
     * 
     */
    void modifyFile(String groupName, String path, InputStream inputStream, long fileSize, long fileOffset);
     
    /**
     * 修改文件内容的内容</br>
     * 从offset开始覆盖fileSize长度</br>
     * 报22参数错误,请检查offset是否超过文件长度</br>
     *
     * @param fileId      文件路径(fastdfs返回的file_id,包含前面的group)
     * @param inputStream 文件输入流
     * @param fileSize    文件大小
     * @param fileOffset  开始位置
     * 
     */
    void modifyFile(String fileId, InputStream inputStream, long fileSize, long fileOffset);
 
    /**
     * 清除文件的内容
     *
     * @param groupName         组名称
     * @param path              文件路径
     * @param truncatedFileSize 截断文件大小
     * 
     */
    void truncateFile(String groupName, String path, long truncatedFileSize);
     
    /**
     * 清除文件的内容
     *
     * @param fileId            文件路径(fastdfs返回的file_id,包含前面的group)
     * @param truncatedFileSize 截断文件大小
     * 
     */
    void truncateFile(String fileId, long truncatedFileSize);
 
    /**
     * 清除文件的内容
     *
     * @param groupName 组名称
     * @param path      文件路径
     * 
     */
    void truncateFile(String groupName, String path);
     
    /**
     * 清除文件的内容
     *
     * @param fileId 文件路径(fastdfs返回的file_id,包含前面的group)
     * 
     */
    void truncateFile(String fileId);
}

ps.api我只放出了StorageClient的说明,TrackerClient的使用常规开发时用不到的,架构在进行调优的时候可能会使用到,所以这里就不做过多的解释

MybatisSql获取工具类SqlHelper使用说明

Published on:

项目地址

tsoft-common

License

前言

有的时候我们想在代码中获取Mybatis方法的sql但是又不想去实际执行Mybatis的查询方法,可以使用该工具直接得到sql。

Maven引入

<dependency>
  <groupId>cn.tsoft.framework</groupId>
  <artifactId>tsoft-common</artifactId>
  <version>1.0.0-SNAPSHOT</version>
</dependency>

目标

SqlHelper是获取Mybatis方法的sql工具包,支持mybatis mapper方式和sqlmap方式,支持参数:entity,map,array,list,这个工具不需要你实际去执行Mybatis的查询方法就能得到sql,方法主要分两大类,使用命名空间namespace调用或者使用Mapper接口方式调用。

测试方法

String sql = null;
UserEntity entity = new UserEntity();
entity.setUserId(1L);
entity.setPassword("sdflkjsldjf");
entity.setPasswordExpire(new Date());
entity.setVersion(2L);
List<Long> list = new ArrayList<Long>();
list.add(1L);
list.add(2L);
Long[] ids = new Long[]{1L,2L};
//方式一
sql = SqlHelper.getMapperSql(userMapper, "mobileIsExists", 1L, "13800138000");
System.out.println("方式一:参数为:@Param:"+sql);
sql = SqlHelper.getMapperSql(userMapper, "mobileIsExists");
System.out.println("方式一:参数为:无参:"+sql);
sql = SqlHelper.getMapperSql(userMapper, "modifyPassword", entity);
System.out.println("方式一:参数为:entity"+sql);
sql = SqlHelper.getMapperSql(userMapper, "blockedArrays", ids);
System.out.println("方式一:参数为:arrays"+sql);
sql = SqlHelper.getMapperSql(userMapper, "blockedList", list);
System.out.println("方式一:参数为:list"+sql);
 
SqlSession sqlSession = mybatisSessionFactory.getObject().openSession();
//方式二
sql = SqlHelper.getMapperSql(sqlSession, "cn.tsoft.account.mapper.UserMapper.mobileIsExists", 1L, "13800138000");
System.out.println("方式二:参数为:@Param:"+sql);
sql = SqlHelper.getMapperSql(sqlSession, "cn.tsoft.account.mapper.UserMapper.mobileIsExists");
System.out.println("方式二:参数为:无参:"+sql);
sql = SqlHelper.getMapperSql(sqlSession, "cn.tsoft.account.mapper.UserMapper.modifyPassword", entity);
System.out.println("方式二:参数为:entity"+sql);
sql = SqlHelper.getMapperSql(sqlSession, "cn.tsoft.account.mapper.UserMapper.blockedArrays", ids);
System.out.println("方式二:参数为:arrays"+sql);
sql = SqlHelper.getMapperSql(sqlSession, "cn.tsoft.account.mapper.UserMapper.blockedList", list);
System.out.println("方式二:参数为:list"+sql);
 
//方式三
sql = SqlHelper.getMapperSql(sqlSession, UserMapper.class, "mobileIsExists", 1L, "13800138000");
System.out.println("方式三:参数为:@Param:"+sql);
sql = SqlHelper.getMapperSql(sqlSession, UserMapper.class, "mobileIsExists");
System.out.println("方式三:参数为:无参:"+sql);
sql = SqlHelper.getMapperSql(sqlSession, UserMapper.class, "modifyPassword", entity);
System.out.println("方式三:参数为:entity"+sql);
sql = SqlHelper.getMapperSql(sqlSession, UserMapper.class, "blockedArrays", ids);
System.out.println("方式三:参数为:arrays"+sql);
sql = SqlHelper.getMapperSql(sqlSession, UserMapper.class, "blockedList", list);
System.out.println("方式三:参数为:list"+sql);

日志输出

方式一:参数为:@Param:
SELECT COUNT(t.`ID`) FROM t_user t 
WHERE t.`MOBILE` = '13800138000'
          
            AND t.`USER_ID` != '1'
方式一:参数为:无参:
SELECT COUNT(t.`ID`) FROM t_user t 
WHERE t.`MOBILE` = 'null'
方式一:参数为:entity:
UPDATE t_user t 
        SET 
        t.`PASSWORD` = 'sdflkjsldjf' , 
        t.`LAST_UPDATE_TIME` = CURRENT_TIMESTAMP , 
        t.`PASSWORD_MODIFY_TIME` = CURRENT_TIMESTAMP , 
        t.`PASSWORD_EXPIRE` = 'Fri Aug 25 19:36:00 CST 2017' , 
        t.`VERSION` = t.`VERSION` + 1 
        WHERE 
        t.`USER_ID` = 1 
        AND t.`VERSION` = 2
方式一:参数为:arrays:
UPDATE t_user t
        SET 
        t.`LAST_UPDATE_TIME` = CURRENT_TIMESTAMP , 
        t.`VERSION` = t.`VERSION` + 1 
        WHERE 
        t.`USER_ID` in
           
            1
         , 
            2
方式一:参数为:list:
UPDATE t_user t
        SET 
        t.`LAST_UPDATE_TIME` = CURRENT_TIMESTAMP , 
        t.`VERSION` = t.`VERSION` + 1 
        WHERE 
        t.`USER_ID` in
           
            1
         , 
            2
方式二:参数为:@Param:
SELECT COUNT(t.`ID`) FROM t_user t 
        WHERE t.`MOBILE` = '13800138000'
          
            AND t.`USER_ID` != '1'
方式二:参数为:无参:
SELECT COUNT(t.`ID`) FROM t_user t 
        WHERE t.`MOBILE` = 'null'
方式二:参数为:entity:
UPDATE t_user t 
        SET 
        t.`PASSWORD` = 'sdflkjsldjf' , 
        t.`LAST_UPDATE_TIME` = CURRENT_TIMESTAMP , 
        t.`PASSWORD_MODIFY_TIME` = CURRENT_TIMESTAMP , 
        t.`PASSWORD_EXPIRE` = 'Fri Aug 25 19:36:00 CST 2017' , 
        t.`VERSION` = t.`VERSION` + 1 
        WHERE 
        t.`USER_ID` = 1 
        AND t.`VERSION` = 2
方式二:参数为:arrays:
UPDATE t_user t
        SET 
        t.`LAST_UPDATE_TIME` = CURRENT_TIMESTAMP , 
        t.`VERSION` = t.`VERSION` + 1 
        WHERE 
        t.`USER_ID` in
           
            1
         , 
            2
方式二:参数为:list:
UPDATE t_user t
        SET 
        t.`LAST_UPDATE_TIME` = CURRENT_TIMESTAMP , 
        t.`VERSION` = t.`VERSION` + 1 
        WHERE 
        t.`USER_ID` in
           
            1
         , 
            2
方式三:参数为:@Param:
SELECT COUNT(t.`ID`) FROM t_user t 
        WHERE t.`MOBILE` = '13800138000'
          
            AND t.`USER_ID` != '1'
方式三:参数为:无参:
SELECT COUNT(t.`ID`) FROM t_user t 
        WHERE t.`MOBILE` = 'null'
方式三:参数为:entity:
UPDATE t_user t 
        SET 
        t.`PASSWORD` = 'sdflkjsldjf' , 
        t.`LAST_UPDATE_TIME` = CURRENT_TIMESTAMP , 
        t.`PASSWORD_MODIFY_TIME` = CURRENT_TIMESTAMP , 
        t.`PASSWORD_EXPIRE` = 'Fri Aug 25 19:36:00 CST 2017' , 
        t.`VERSION` = t.`VERSION` + 1 
        WHERE 
        t.`USER_ID` = 1 
        AND t.`VERSION` = 2
方式三:参数为:arrays:
UPDATE t_user t
        SET 
        t.`LAST_UPDATE_TIME` = CURRENT_TIMESTAMP , 
        t.`VERSION` = t.`VERSION` + 1 
        WHERE 
        t.`USER_ID` in
           
            1
         , 
            2
方式三:参数为:list:
UPDATE t_user t
        SET 
        t.`LAST_UPDATE_TIME` = CURRENT_TIMESTAMP , 
        t.`VERSION` = t.`VERSION` + 1 
        WHERE 
        t.`USER_ID` in
           
            1
         , 
            2

JMS实现参数的集中式管理

Published on:
Tags: JMS UCM

点评

虽然现在开源的UCM套件很多,UCM统一配置管理(百度的disconf、阿里的diamond、点评的lion,等很多开源的)。但是很多人是知其然不知其所以然,刚好发现下面这篇文章可以作为原理的教程文章,使用JMS、Redis、Zookeeper简单的实现UCM基本功能,作为学习交流还是很不错的。

文章转自:https://my.oschina.net/OutOfMemory/blog/1510101 作者:@ksfzhaohui

前言

JMS的发布订阅机制也能实现类似的功能,集群节点通过订阅指定的节点,同时使用JMS对消息的过滤器功能,实现对指定参数的更新,本文将介绍通过JMS实现简单的参数集中式管理。

Maven引入

Spring相关的jar引入参考上一篇文章

<dependency>
    <groupId>javax.jms</groupId>
    <artifactId>jms</artifactId>
    <version>1.1</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.10.0</version>
</dependency>

目标

  1. 可以同时配置监听多个节点如/app1,/app2;
  2. 希望只需要配置如/app1,就能够监听其子节点如/app1/modual1以及子节点的子节点如/app1/modual1/xxx/…;
  3. 服务器启动能获取当前指定父节点下的所有子节点数据;
  4. 在添加节点或者在更新节点数据的时候能够动态通知,这样代码中就能够实时获取最新的数据;
  5. spring配置中可以从Zookeeper中读取参数进行初始化。

虽然在实现的方式上有点区别,但是最终达成的目标是一致的,同样列出了这5条目标

实现

MQWatcher主要用来和JMS建立连接,同时订阅指定节点,建立点对点连接,过滤出需要监听的数据,更新数据,初始化数据,存储数据等 InitConfServer主要作为点对点连接的服务器端用来初始化数据

1.同时配置监听多个节点 提供一个字符串数组给用户用来添加需要监听的节点:

private String[] keyPatterns;

2.能够监听其子节点以及子节点的子节点 使用了一种和Zookeeper不一样的方式,JMS的方式是将所有的数据变更都发送到订阅者,然后订阅者通过过滤出需要的数据进行更新

/** MQ的过滤器 **/
private StringBuffer keyFilter = new StringBuffer();
 
private final String TOPIC = "dynamicConfTopic";
 
private void watcherPaths() throws JMSException {
    Topic topic = session.createTopic(TOPIC);
    MessageConsumer consumer = session.createConsumer(topic, keyFilter.toString());
    consumer.setMessageListener(new MessageListener() {
 
        @Override
        public void onMessage(Message message) {
            try {
                String key = message.getStringProperty(IDENTIFIER);
                TextMessage tm = (TextMessage) message;
                keyValueMap.put(key, tm.getText());
                LOGGER.info("key = " + key + ",value = " + tm.getText());
            } catch (JMSException e) {
                LOGGER.error("onMessage error", e);
            }
        }
    });
}

对TOPIC进行了订阅,并且指定了过滤器keyFilter,keyFilter正是基于keyPatterns组装而成的

private final String IDENTIFIER = "confKey";
 
/**
* 生成接受过滤器
*/
private void generateKeyFilter() {
    for (int i = 0; i < keyPatterns.length; i++) {
        keyFilter.append(IDENTIFIER + " LIKE '" + keyPatterns[i] + "%'");
        if (i < keyPatterns.length - 1) {
            keyFilter.append(" OR ");
        }
    }
    LOGGER.info("keyFilter : " + keyFilter.toString());
}

对指定的属性IDENTIFIER,通过LIKE和OR关键字进行过滤

3.服务器启动初始化节点数据 通过点对点的方式,在服务器启动时通过请求响应模式来获取初始化数据

private final String QUEUE = "dynamicConfQueue";
 
/**
 * 初始化key-value值
 * 
 * @throws JMSException
 */
private void initKeyValues() throws JMSException {
    TemporaryQueue responseQueue = null;
    MessageProducer producer = null;
    MessageConsumer consumer = null;
    Queue queue = queueSession.createQueue(QUEUE);
 
    TextMessage requestMessage = queueSession.createTextMessage();
    requestMessage.setText(generateKeyString());
    responseQueue = queueSession.createTemporaryQueue();
    producer = queueSession.createProducer(queue);
    consumer = queueSession.createConsumer(responseQueue);
    requestMessage.setJMSReplyTo(responseQueue);
    producer.send(requestMessage);
 
    MapMessage receiveMap = (MapMessage) consumer.receive();
    @SuppressWarnings("unchecked")
    Enumeration<String> mapNames = receiveMap.getPropertyNames();
    while (mapNames.hasMoreElements()) {
        String key = mapNames.nextElement();
        String value = receiveMap.getStringProperty(key);
        keyValueMap.put(key, value);
        LOGGER.info("init key = " + key + ",value = " + value);
    }
}

通过对指定QUEUE请求,同时建立一个临时的响应QUEUE,然后接受一个MapMessage,用来初始化keyValueMap

4.监听节点数据的变更 通过发布订阅模式,接受所有数据,然后进行过滤,目标2中已经有相关实现

5.spring配置中可以从Zookeeper中读取参数进行初始化

public class MQPropPlaceholderConfigurer extends PropertyPlaceholderConfigurer {
 
    private MQWatcher mqwatcher;
 
    @Override
    protected Properties mergeProperties() throws IOException {
        return loadPropFromMQ(super.mergeProperties());
    }
 
    /**
     * 从MQ中加载配置的常量
     * 
     * @param result
     * @return
     */
    private Properties loadPropFromMQ(Properties result) {
        mqwatcher.watcherKeys();
        mqwatcher.fillProperties(result);
        return result;
    }
}

通过以上的处理,可以使用如下简单的配置来达到目标:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
    <bean id="person" class="zh.maven.DynamicConf.Person">
        <property name="name">
            <value>${/a2/m1}</value>
        </property>
        <property name="address">
            <value>${/a3/m1/v2}</value>
        </property>
        <property name="company">
            <value>${/a3/m1/v2/t2}</value>
        </property>
    </bean>
    <bean id="mqwatcher" class="zh.maven.DynamicConf.mq.MQWatcher">
        <property name="keyPatterns" value="/a2,/a3" />
    </bean>
    <bean id="propertyConfigurer" class="zh.maven.DynamicConf.mq.MQPropPlaceholderConfigurer">
        <property name="mqwatcher" ref="mqwatcher"></property>
    </bean>
</beans>

测试

1.启动ActiveMQ

activemq.bat

2.InitConfServer启动 用来监听集群节点的初始化请求,获取到集群节点发送来的keyPatterns,然后将符合其模式的数据封装成MapMessage发送给集群节点

@Override
public void onMessage(Message message) {
    try {
        TextMessage receiveMessage = (TextMessage) message;
        String keys = receiveMessage.getText();
        LOGGER.info("keys = " + keys);
        MapMessage returnMess = session.createMapMessage();
        returnMess.setStringProperty("/a2/m1", "zhaohui");
        returnMess.setStringProperty("/a3/m1/v2", "nanjing");
        returnMess.setStringProperty("/a3/m1/v2/t2", "zhaohui");
 
        QueueSender sender = session.createSender((Queue) message.getJMSReplyTo());
        sender.send(returnMess);
    } catch (Exception e) {
        LOGGER.error("onMessage error", e);
    }
}

以上代码只是进行了简单的模拟,提供了一个思路

3.启动Main类

public class Main {
 
    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "spring-config.xml" });
        Person person = (Person) context.getBean("person");
        System.out.println(person.toString());
        }
}

4.启动TopicPublisher 定时发布数据,同时查看集群节点的Main类日志输出

public class TopicPublisher {
    private static final String TOPIC = "dynamicConfTopic";
    private static final String IDENTIFIER = "confKey";
 
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();
 
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC);
 
        MessageProducer producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
        int i=1;
        while (true) {
            TextMessage message = session.createTextMessage();
            message.setStringProperty(IDENTIFIER, "/a2/"+i);
            message.setText("message_" + System.currentTimeMillis());
            producer.send(message);
            System.out.println("Sent message: " + message.getText());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            i++;
        }
    }
}

日志输出如下:

2017-08-14 21:52:23 - keyFilter : confKey LIKE '/a2%' OR confKey LIKE '/a3%'
2017-08-14 21:52:24 - init key = /a3/m1/v2/t2,value = zhaohui
2017-08-14 21:52:24 - init key = /a3/m1/v2,value = nanjing
2017-08-14 21:52:24 - init key = /a2/m1,value = zhaohui
2017-08-14 21:52:24 - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@223dd567: defining beans [person,mqwatcher,propertyConfigurer]; root of factory hierarchy
name = zhaohui,address = nanjing,company = zhaohui
2017-08-14 21:52:33 - key = /a2/1,value = message_1502718753819
2017-08-14 21:52:35 - key = /a2/2,value = message_1502718755832
2017-08-14 21:52:37 - key = /a2/3,value = message_1502718757846
2017-08-14 21:52:39 - key = /a2/4,value = message_1502718759860
2017-08-14 21:52:41 - key = /a2/5,value = message_1502718761876

总结

通过JMS实现了一个简单的参数化平台系统,当然想在生产中使用还有很多需要优化的地方,本文在于提供一个思路;后续有时间准备对DynamicConf提供更加完善的方案。

Zookeeper实现参数的集中式管理

Published on:
Tags: Zookeeper UCM

点评

虽然现在开源的UCM套件很多,UCM统一配置管理(百度的disconf、阿里的diamond、点评的lion,等很多开源的)。但是很多人是知其然不知其所以然,刚好发现下面这篇文章可以作为原理的教程文章,使用JMS、Redis、Zookeeper简单的实现UCM基本功能,作为学习交流还是很不错的。

文章转自:https://my.oschina.net/OutOfMemory/blog/1503392 作者:@ksfzhaohui

前言

应用项目中都会有一些参数,一般的做法通常可以选择将其存储在本地配置文件或者内存变量中;对于集群机器规模不大、配置变更不是特别频繁的情况下,这两种方式都能很好的解决;但是一旦集群机器规模变大,且配置信息越来越频繁,依靠这两种方式就越来越困难;我们希望能够快速的做到全局参数的变更,因此需要一种参数的集中式管理,下面利用Zookeeper的一些特性来实现简单的参数管理。

准备

jdk:1.7.0_80
zookeeper:3.4.3
curator:2.6.0
spring:3.1.2

Maven引入

Spring相关的jar引入参考上一篇文章

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-core</artifactId>
    <version>3.1.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>3.1.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-beans</artifactId>
    <version>3.1.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.3</version>
    <exclusions>
        <exclusion>
            <groupId>com.sun.jmx</groupId>
            <artifactId>jmxri</artifactId>
        </exclusion>
        <exclusion>
            <groupId>com.sun.jdmk</groupId>
            <artifactId>jmxtools</artifactId>
        </exclusion>
        <exclusion>
            <groupId>javax.jms</groupId>
            <artifactId>jms</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.6.0</version>
</dependency>

目标

  1. 可以同时配置监听多个节点如/app1,/app2;
  2. 希望只需要配置如/app1,就能够监听其子节点如/app1/modual1以及子节点的子节点如/app1/modual1/xxx/…;
  3. 服务器启动能获取当前指定父节点下的所有子节点数据;
  4. 在添加节点或者在更新节点数据的时候能够动态通知,这样代码中就能够实时获取最新的数据;
  5. spring配置中可以从Zookeeper中读取参数进行初始化。

实现

提供ZKWatcher类主要用来和Zookeeper建立连接,监听节点,初始化节点数据,更新节点数据,存储节点数据等

1.同时配置监听多个节点 提供一个字符串数组给用户用来添加需要监听的节点:

private String[] keyPatterns;

2.能够监听其子节点以及子节点的子节点 使用递归的方式用来获取指定监听节点的子节点:

private List<String> listChildren(String path) throws Exception {
    List<String> pathList = new ArrayList<String>();
    pathList.add(path);
    List<String> list = client.getChildren().forPath(path);
    if (list != null && list.size() > 0) {
        for (String cPath : list) {
            String temp = "";
            if ("/".equals(path)) {
                temp = path + cPath;
            } else {
                temp = path + "/" + cPath;
            }
            pathList.addAll(listChildren(temp));
        }
    }
    return pathList;
}

3.服务器启动初始化节点数据 上面已经递归获取了所有的节点,所有可以遍历获取所有节点数据,并且存储在Map中:

private Map<String, String> keyValueMap = new ConcurrentHashMap<String, String>();
 
if (pathList != null && pathList.size() > 0) {
    for (String path : pathList) {
        keyValueMap.put(path, readPath(path));
        watcherPath(path);
    }
}
 
private String readPath(String path) throws Exception {
    byte[] buffer = client.getData().forPath(path);
    String value = new String(buffer);
    logger.info("readPath:path = " + path + ",value = " + value);
    return value;
}

4.监听节点数据的变更 使用PathChildrenCache用来监听子节点的CHILD_ADDED,CHILD_UPDATED,CHILD_REMOVED事件:

private void watcherPath(String path) {
    PathChildrenCache cache = null;
    try {
        cache = new PathChildrenCache(client, path, true);
        cache.start(StartMode.POST_INITIALIZED_EVENT);
        cache.getListenable().addListener(new PathChildrenCacheListener() {
 
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                case CHILD_ADDED:
                    logger.info("CHILD_ADDED," + event.getData().getPath());
                    watcherPath(event.getData().getPath());
                    keyValueMap.put(event.getData().getPath(), new String(event.getData().getData()));
                    break;
                case CHILD_UPDATED:
                    logger.info("CHILD_UPDATED," + event.getData().getPath());
                    keyValueMap.put(event.getData().getPath(), new String(event.getData().getData()));
                    break;
                case CHILD_REMOVED:
                    logger.info("CHILD_REMOVED," + event.getData().getPath());
                    break;
                default:
                    break;
                }
            }
        });
    } catch (Exception e) {
        if (cache != null) {
            try {
                cache.close();
            } catch (IOException e1) {
            }
        }
        logger.error("watch path error", e);
    }
}

5.spring配置中可以从Zookeeper中读取参数进行初始化 实现自定义的PropertyPlaceholderConfigurer类ZKPropPlaceholderConfigurer:

public class ZKPropPlaceholderConfigurer extends PropertyPlaceholderConfigurer {
 
    private ZKWatcher zkwatcher;
 
    @Override
    protected Properties mergeProperties() throws IOException {
        return loadPropFromZK(super.mergeProperties());
    }
 
    /**
     * 从zk中加载配置的常量
     * 
     * @param result
     * @return
     */
    private Properties loadPropFromZK(Properties result) {
        zkwatcher.watcherKeys();
        zkwatcher.fillProperties(result);
        return result;
    }
    ......
}

通过以上的处理,可以使用如下简单的配置来达到目标:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
 
    <bean id="zkwatcher" class="zh.maven.DynamicConf.ZKWatcher">
        <property name="keyPatterns" value="/a2,/a3/m1" />
    </bean>
 
    <bean id="propertyConfigurer" class="zh.maven.DynamicConf.ZKPropPlaceholderConfigurer">
        <property name="zkwatcher" ref="zkwatcher"></property>
    </bean>
 
    <bean id="person" class="zh.maven.DynamicConf.Person">
        <property name="name">
            <value>${/a2/m1}</value>
        </property>
        <property name="address">
            <value>${/a3/m1/v2}</value>
        </property>
        <property name="company">
            <value>${/a3/m1/v2/t2}</value>
        </property>
    </bean>
</beans>

测试

1.首先启动Zookeeper

zkServer.cmd

2.初始化需要使用的节点

public class Create_Node {
 
    static String path = "/a3/m1/v2/t2";
    static CuratorFramework client = CuratorFrameworkFactory.builder()
            .connectString("127.0.0.1:2181").sessionTimeoutMs(5000)
            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
 
    public static void main(String[] args) throws Exception {
        client.start();
        client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath(path, "init".getBytes());
    }
}

创建需要的节点方便ZKWatcher来监听,这里根据以上的配置,分别初始化/a3/m1/v2/t2和/a2/m1/v1/t1

3.启动Main,分别验证配置文件中的初始化以及代码动态获取参数

public class Main {
 
    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "spring-config.xml" });
        Person person = (Person) context.getBean("person");
        System.out.println(person.toString());
 
        ZKWatcher zkwatcher = (ZKWatcher) context.getBean("zkwatcher");
        while (true) {
            Person p = new Person(zkwatcher.getKeyValue("/a2/m1"), zkwatcher.getKeyValue("/a3/m1/v2"),
                    zkwatcher.getKeyValue("/a3/m1/v2/t2"));
            System.out.println(p.toString());
 
            Thread.sleep(1000);
        }
    }
}

4.观察日志同时更新参数:

public class Set_Data {
 
    static String path = "/a3/m1/v2/t2";
    static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
            .sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
 
    public static void main(String[] args) throws Exception {
        client.start();
        Stat stat = new Stat();
        System.out.println(stat.getVersion());
        System.out.println("Success set node for :" + path + ",new version:"
                + client.setData().forPath(path, "codingo_v2".getBytes()).getVersion());
    }
}

部分日志如下:

2017-08-05 18:04:57 - watcher path : [/a2, /a2/m1, /a2/m1/v1, /a2/m1/v1/t2, /a3/m1, /a3/m1/v2, /a3/m1/v2/t2]
2017-08-05 18:04:57 - readPath:path = /a2,value = 
2017-08-05 18:04:57 - readPath:path = /a2/m1,value = zhaohui
2017-08-05 18:04:57 - readPath:path = /a2/m1/v1,value = 
2017-08-05 18:04:57 - readPath:path = /a2/m1/v1/t2,value = init
2017-08-05 18:04:57 - readPath:path = /a3/m1,value = 
2017-08-05 18:04:57 - readPath:path = /a3/m1/v2,value = nanjing
2017-08-05 18:04:57 - readPath:path = /a3/m1/v2/t2,value = codingo_v10
2017-08-05 18:04:57 - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@182f4aea: defining beans [zkwatcher,propertyConfigurer,person]; root of factory hierarchy
name = zhaohui,address = nanjing,company = codingo_v10
name = zhaohui,address = nanjing,company = codingo_v10
2017-08-05 18:04:57 - CHILD_ADDED,/a2/m1
2017-08-05 18:04:57 - CHILD_ADDED,/a3/m1/v2
2017-08-05 18:04:57 - CHILD_ADDED,/a2/m1/v1
2017-08-05 18:04:57 - CHILD_ADDED,/a2/m1/v1/t2
2017-08-05 18:04:57 - CHILD_ADDED,/a3/m1/v2/t2
name = zhaohui,address = nanjing,company = codingo_v10
name = zhaohui,address = nanjing,company = codingo_v10
name = zhaohui,address = nanjing,company = codingo_v10
2017-08-05 18:05:04 - CHILD_UPDATED,/a3/m1/v2/t2
name = zhaohui,address = nanjing,company = codingo_v11
name = zhaohui,address = nanjing,company = codingo_v11

总结

通过Zookeeper实现了一个简单的参数化平台,当然想在生产中使用还有很多需要优化的地方,本文在于提供一个思路;当然除了Zookeeper还可以使用MQ,分布式缓存等来实现参数化平台。

[转]Redis实现参数的集中式管理

Published on:
Tags: Redis UCM

点评

虽然现在开源的UCM套件很多,UCM统一配置管理(百度的disconf、阿里的diamond、点评的lion,等很多开源的)。但是很多人是知其然不知其所以然,刚好发现下面这篇文章可以作为原理的教程文章,使用JMS、Redis、Zookeeper简单的实现UCM基本功能,作为学习交流还是很不错的。

文章转自:https://my.oschina.net/OutOfMemory/blog/1526063 作者:@ksfzhaohui

前言

利用的Redis的发布订阅功能实现对参数的集中式管理;分布式缓存Redis提供了类似的发布订阅功能,并且Redis本身提供了缓存和持久化的功能,本文将介绍通过Redis实现简单的参数集中式管理。

Maven引入

Spring相关的jar引入参考上一篇文章

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.4.0</version>
</dependency>

目标

  1. 可以同时配置监听多个节点如/app1,/app2;
  2. 希望只需要配置如/app1,就能够监听其子节点如/app1/modual1以及子节点的子节点如/app1/modual1/xxx/…;
  3. 服务器启动能获取当前指定父节点下的所有子节点数据;
  4. 在添加节点或者在更新节点数据的时候能够动态通知,这样代码中就能够实时获取最新的数据;
  5. spring配置中可以从Zookeeper中读取参数进行初始化。

虽然在实现的方式上有点区别,但是最终达成的目标是一致的,同样列出了这5条目标

实现

RedisWatcher主要用来和Redis进行连接,然后对监听的节点进行初始化,模糊订阅需要监听的节点,最后接受数据的变更,更新本地数据,存储数据等。

1.同时配置监听多个节点 提供一个字符串数组给用户用来添加需要监听的节点:

private String[] keyPatterns;

2.能够监听其子节点以及子节点的子节点 使用Redis提供的psubscribe命令,订阅一个或多个符合给定模式的频道,提供了模糊订阅的功能

private void watcherPaths() {
    new Thread(new Runnable() {
 
        @Override
        public void run() {
            jedis.psubscribe(new JedisPubSub() {
 
                @Override
                public void onMessage(String channel, String message) {
                    try {
                        keyValueMap.put(channel, message);
                        LOGGER.info("key = " + channel + ",value = " + message);
                    } catch (Exception e) {
                        LOGGER.error("onMessage error", e);
                    }
                }
 
                @Override
                public void onPMessage(String arg0, String arg1, String arg2) {
                    System.out.println("onPMessage=>" + arg0 + "=" + arg1 + "="
                            + arg2);
                }
 
                @Override
                public void onPSubscribe(String pattern, int subscribedChannels) {
                    LOGGER.info("onPSubscribe=>" + pattern + "=" + subscribedChannels);
                }
 
                @Override
                public void onPUnsubscribe(String arg0, int arg1) {
                }
 
                @Override
                public void onSubscribe(String arg0, int arg1) {
                }
 
                @Override
                public void onUnsubscribe(String arg0, int arg1) {
                }
            }, getSubKeyPatterns());
        }
    }).start();
}

3.服务器启动初始化节点数据 通过使用keys命令来获取匹配节点的数据(keys命令可能引发性能问题,根据实际情况使用)

private void initKeyValues() {
    for (String keyPattern : keyPatterns) {
        Set<String> keys = jedis.keys(keyPattern + "*");
        for (String key : keys) {
            String value = jedis.get(key);
            keyValueMap.put(key, value);
            LOGGER.info("init key = " + key + ",value = " + value);
        }
    }
}

4.监听节点数据的变更 目标2中通过psubscribe命令,使用模糊订阅来监听数据的变更,onMessage用来接受变更的数据

5.spring配置中可以从Redis中读取参数进行初始化

public class RedisPropPlaceholderConfigurer extends PropertyPlaceholderConfigurer {
 
    private RedisWatcher rediswatcher;
 
    @Override
    protected Properties mergeProperties() throws IOException {
        return loadPropFromRedis(super.mergeProperties());
    }
 
    /**
     * 从Redis中加载配置的常量
     * 
     * @param result
     * @return
     */
    private Properties loadPropFromRedis(Properties result) {
        rediswatcher.watcherKeys();
        rediswatcher.fillProperties(result);
        return result;
    }
}

通过以上的处理,可以使用如下简单的配置来达到目标:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
    <bean id="rediswatcher" class="zh.maven.DynamicConf.redis.RedisWatcher">
        <property name="keyPatterns" value="/a2,/a3" />
    </bean>
    <bean id="propertyConfigurer" class="zh.maven.DynamicConf.redis.RedisPropPlaceholderConfigurer">
        <property name="rediswatcher" ref="rediswatcher"></property>
    </bean>
    <bean id="person" class="zh.maven.DynamicConf.Person">
        <property name="name">
            <value>${/a2/m1}</value>
        </property>
        <property name="address">
            <value>${/a3/m1/v2}</value>
        </property>
        <property name="company">
            <value>${/a3/m1/v2/t2}</value>
        </property>
    </bean>
</beans>

测试

1.启动Redis服务器

redis-server.exe

2.启动Redis客户端进行初始化数据

redis-cli.exe
redis 127.0.0.1:6379> set /a2/m1 zhaohui
OK
redis 127.0.0.1:6379> set /a3/m1/v2 nanjing
OK
redis 127.0.0.1:6379> set /a3/m1/v2/t2 codingo
OK

3.启动Main类

public class Main {
  
    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "spring-config.xml" });
        Person person = (Person) context.getBean("person");
        System.out.println(person.toString());
        }
}

4.启动RedisPublish 定时发布数据,同时查看集群节点的Main类日志输出

public class RedisPublish {
 
    public static void main(String[] args) {
        Jedis jedis = new Jedis("127.0.0.1", 6379);
        int i = 0;
        while (true) {
            jedis.publish("/a2/b4/c1" + i, "message_" + System.currentTimeMillis());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            i++;
        }
    }
}

日志输出如下:

2017-08-30 10:44:00 - init key = /a2/m1,value = zhaohui
2017-08-30 10:44:00 - init key = /a3/m1/v2,value = nanjing
2017-08-30 10:44:00 - init key = /a3/m1/v2/t2,value = codingo
2017-08-30 10:44:00 - onPSubscribe=>/a2*=1
2017-08-30 10:44:00 - onPSubscribe=>/a3*=2
2017-08-30 10:44:00 - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@4bad4a49: defining beans [rediswatcher,propertyConfigurer,person]; root of factory hierarchy
name = zhaohui,address = nanjing,company = codingo
onPMessage=>/a2*=/a2/b4/c10=message_1504061045414
onPMessage=>/a2*=/a2/b4/c11=message_1504061047458
onPMessage=>/a2*=/a2/b4/c12=message_1504061049458
onPMessage=>/a2*=/a2/b4/c13=message_1504061051458

总结

关于参数的集中式管理一共写了三篇文章,分别利用Zookeeper,MQ以及Redis来实现了一个简单的参数的集中式管理,但更多的只是提供了一个思路 离生产还有很大距离,本片文章也是这个系列的最后一篇,综合来看Zookeeper更加适合做参数的集中式管理平台,MQ方式本身没有提供存储的功能 只能作为一个中间层存在;而Redis方式虽然提供了持久化功能,但是会因为选择不同的持久化方式会出现丢数据的可能,还有就是本身的集群方式 并不是很完善;虽然Zookeeper本身并不是一个存储系统,但是紧紧用来存储少量的参数应该足够了。

Spring框架-事务管理注意事项

Published on:

常见事务问题

  1. 事务不起作用
    • 可能是配置不起效,如扫描问题
  2. 事务自动提交了(批量操作中)
    • 可能是在没事务的情况下,利用了数据库的隐式提交

事务配置说明

通常情况下我们的Spring Component扫描分为两部分,一部分是Spring Servlet(MVC),一部分是其他Context Config的内容。主要扫描Annotation定义,包括@Controller@Autowired@Resource@Service@Component@Repository等。

Spring Servlet部分的扫描配置可以通过web.xmlDispatchServletinit-param节点配置确定。

Context Config部分的扫描配置为非以上配置的其他Spring配置文件确定。

为了能够使用事务,需要防止因Spring Servlet的扫描导致@Service事务配置失效。可以调整DispatchServlet中的配置文件,排除对@Service的扫描。

配置如下:

<context:component-scan base-package="com.jiuyescm.xxx">
	<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Service" />
</context:component-scan>

如何通过日志判断事务是否已经被Spring所管理?

  1. 在logback或者log4j中对org.springframework.aop、org.springframework.transaction、org.springframework.jdbc、org.mybatis.spring.transaction进行DEBUG级别日志跟踪(开发期)
  2. 查看日志中是否有事务管理、开启、提交、回滚等字符,如:

    DEBUG o.m.spring.transaction.SpringManagedTransaction - JDBC Connection [com.alibaba.druid.proxy.jdbc.ConnectionProxyImpl@28cfe912] will be managed by Spring
    
  3. 没有被控制的时候,日志如下:

    DEBUG o.m.spring.transaction.SpringManagedTransaction - JDBC Connection [com.alibaba.druid.proxy.jdbc.ConnectionProxyImpl@28cfe912] will not be managed by Spring
    

如何通过程序判断是否存在事务?

boolean flag = TransactionSynchronizationManager.isActualTransactionActive();

返回true,则在事务控制下,否则不在控制下

什么时候做了隐式提交?

在没有容器事务的情况下,系统会尝试隐时提交。

spring1

开发建议:

  1. 所有Service代码中设置Class级别的@Transactional,并设置为只读,开发时可以很容易发现误数据库操作的动作。如:@Transactional(readOnly=true)。
  2. 所有Service代码中Public的方法设置@Transactional,并根据实际情况设置Propagation,可以设置为REQUIRED。
  3. 对于有异常产生可能的情况下,根据情况选择合适的rollbackFor,默认情况下可以设置对Exception.class或BizException.class进行控制。
  4. 尽可能减少嵌套的使用方法(Service call Service),采用传统的Controller-》Service-》Repository(DAO)的模型。

如果需要深入了解Transaction的流程,请自行翻阅和跟踪Spring和Mybatis相关代码。

以下是嵌套事务的各种情况下的执行结果(前提数据库的AutoCommit为true)

编号 External(Service) Internal(Service) Result Memo
1 No Transactional No Transactional All Committed Auto Commit = True
2 No Transactional Class Level ReadOnly Transactional External Committed Internal TransientDataAccessResourceException Can’t update table
3 No Transactional Transactional(REQUIRED) All Committed
4 No Transactional Transactional(REQUIRES_NEW) All Committed
5 No Transactional Transactional(SUPPORTS) All Committed
6 No Transactional Transactional(MANDATORY) External Committed Internal IllegalTransactionStateException Must under transaction
7 No Transactional Transactional(NOT_SUPPORTED) All Committed
8 No Transactional Transactional(NEVER) All Committed
9 No Transactional Transactional(NESTED) All Committed
10 No Transactional Transactional(REQUIRED) rollackFor=Exception.class IOException External Committed Internal Rollbacked
11 No Transactional Transactional(REQUIRED) rollbackFor=RuntimeException.class IOException All Committed
12 No Transactional Transactional(REQUIRED) rollbackFor=Exception.class RuntimeException External Committed Internal Rollbacked
13 No Transactional Transactional(REQUIRED) rollbackFor=RuntimeException.class RuntimeException External Committed Internal Rollbacked
14 Class Level ReadOnly Transactional No Transactional External TransientDataAccessResourceException Can’t update table
15 Class Level ReadOnly Transactional Class Level ReadOnly Transactional External TransientDataAccessResourceException Can’t update table
16 Transactional(REQUIRED) No Transactional All Committed
17 Transactional(REQUIRES_NEW) No Transactional All Committed
18 Transactional(SUPPORTS) No Transactional All Committed
19 Transactional(MANDATORY) No Transactional External IllegalTransactionStateException Must under transaction
20 Transactional(NOT_SUPPORTED) No Transactional All Committed
21 Transactional(NEVER) No Transactional All Committed
22 Transactional(NESTED) No Transactional All Committed
23 Transactional(REQUIRED) Transactional(REQUIRED) All Committed
24 Transactional(REQUIRED) Transactional(REQUIRES_NEW) All Committed
25 Transactional(REQUIRED) Transactional(SUPPORTS) All Committed
26 Transactional(REQUIRED) Transactional(MANDATORY) All Committed
27 Transactional(REQUIRED) Transactional(NOT_SUPPORTED) All Committed
28 Transactional(REQUIRED) Transactional(NEVER) External Rollbacked Internal IllegalTransactionStateException Must under transaction
29 Transactional(REQUIRED) Transactional(NESTED) All Committed
30 Transactional(REQUIRED) rollackFor=Exception.class Transactional(REQUIRED) rollackFor=Exception.class IOException All Rollbacked
31 Transactional(REQUIRED) rollackFor=Exception.class Transactional(REQUIRED) rollbackFor=RuntimeException.class IOException All Rollbacked
32 Transactional(REQUIRED) rollackFor=RuntimeException.class Transactional(REQUIRED) rollackFor=Exception.class IOException All Rollbacked UnexpectedRollbackException
33 Transactional(REQUIRED) rollackFor=RuntimeException.class Transactional(REQUIRED) rollbackFor=RuntimeException.class IOException All Committed
34 Transactional(REQUIRED) rollackFor=Exception.class Transactional(REQUIRED) rollackFor=Exception.class RuntimeException All Rollbacked
35 Transactional(REQUIRED) rollackFor=Exception.class Transactional(REQUIRED) rollbackFor=RuntimeException.class RuntimeException All Rollbacked
36 Transactional(REQUIRED) rollackFor=RuntimeException.class Transactional(REQUIRED) rollackFor=Exception.class RuntimeException All Rollbacked
37 Transactional(REQUIRED) rollackFor=RuntimeException.class Transactional(REQUIRED) rollbackFor=RuntimeException.class RuntimeException All Rollbacked
38 Transactional(REQUIRED) rollackFor=Exception.class Transactional(REQUIRED) rollackFor=Exception.class IOException Catch IOException All Committed
39 Transactional(REQUIRED) rollackFor=Exception.class Catch IOExceptio Transactional(REQUIRED) rollbackFor=Exception.class IOException All Rollbacked UnexpectedRollbackException
40 Transactional(REQUIRED) rollackFor=Exception.class Catch IOException Transactional(REQUIRED) rollbackFor=RuntimeException.class IOException All Committed

其他情况按照事务是否开启和是否抛出(捕获)对应异常来判断结果。

NPE(java.lang.NullPointerException)防范

Published on:

我们程序中NPE还是比较多的,下面介绍良好的编码规范防止NPE的发生

NPE(java.lang.NullPointerException): 空指针异常

一、【推荐】防止 NPE,是程序员的基本修养,注意 NPE 产生的场景:

1) 返回类型为基本数据类型, return 包装数据类型的对象时,自动拆箱有可能产生 NPE。

反例: public int f() { return Integer 对象}, 如果为 null,自动解箱抛 NPE。

2) 数据库的查询结果可能为 null。

3) 集合里的元素即使 isNotEmpty,取出的数据元素也可能为 null。

4) 远程调用返回对象时,一律要求进行空指针判断,防止 NPE。

5) 对于 Session 中获取的数据,建议 NPE 检查,避免空指针。

6) 级联调用 obj.getA().getB().getC(); 一连串调用,易产生 NPE。

正例: 使用 JDK8 的 Optional 类来防止 NPE 问题。

ps.我们现在开发规范jdk版本jdk1.7.0_45,对于jdk8里面的optional可以了解学习,它是一种友好的解决方式。

二、【强制】当某一列的值全是 NULL 时, count(col)的返回结果为 0,但 sum(col)的返回结果为

NULL,因此使用 sum()时需注意 NPE 问题。

正例: 可以使用如下方式来避免 sum 的 NPE 问题: SELECT IF(ISNULL(SUM(g)),0,SUM(g))

FROM table;

三、【推荐】高度注意 Map 类集合 K/V 能不能存储 null 值的情况,如下表格:

集合类 Key Value Super 说明
Hashtable 不允许为null 不允许为null Dictionary 线程安全
ConcurrentHashMap 不允许为null 不允许为null AbstractMap 分段锁技术
TreeMap 不允许为null 允许为null AbstractMap 线程不安全
HashMap 允许为null 允许为null AbstractMap 线程不安全

反例: 由于 HashMap 的干扰,很多人认为 ConcurrentHashMap 是可以置入 null 值,而事实上,

存储 null 值时会抛出 NPE 异常。

四、【推荐】方法的返回值可以为 null,不强制返回空集合,或者空对象等,必须添加注释充分

说明什么情况下会返回 null 值。调用方需要进行 null 判断防止 NPE 问题。

说明: 明确防止 NPE 是调用者的责任。即使被调用方法返回空集合或者空对象,对调用

者来说,也并非高枕无忧,必须考虑到远程调用失败、 序列化失败、 运行时异常等场景返回

null 的情况。

五、关于基本数据类型与包装数据类型的使用标准如下:

1) 【强制】 所有的 POJO 类属性必须使用包装数据类型。

2) 【强制】 RPC 方法的返回值和参数必须使用包装数据类型。

3) 【 推荐】 所有的局部变量使用基本数据类型。

说明: POJO 类属性没有初值是提醒使用者在需要使用时,必须自己显式地进行赋值,任何

NPE 问题,或者入库检查,都由使用者来保证。

正例: 数据库的查询结果可能是 null,因为自动拆箱,用基本数据类型接收有 NPE 风险。

以上内容摘自阿里巴巴Java开发手册v1.2.0.pdf

JVM调优总结 -Xms -Xmx -Xmn -Xss

Published on:
Tags: JVM 调优

堆大小设置

JVM 中最大堆大小有三方面限制:相关操作系统的数据模型(32-bt还是64-bit)限制;系统的可用虚拟内存限制;系统的可用物理内存限制。32位系统下,一般限制在1.5G~2G;64为操作系统对内存无限制。我在Windows Server 2003 系统,3.5G物理内存,JDK5.0下测试,最大可设置为1478m。

典型设置

  • java -Xmx3550m -Xms3550m -Xmn2g -Xss128k -Xmx3550m:设置JVM最大可用内存为3550M。 -Xms3550m:设置JVM促使内存为3550m。此值可以设置与-Xmx相同,以避免每次垃圾回收完成后JVM重新分配内存。 -Xmn2g:设置年轻代大小为2G。整个JVM内存大小=年轻代大小 + 年老代大小 + 持久代大小。持久代一般固定大小为64m,所以增大年轻代后,将会减小年老代大小。此值对系统性能影响较大,Sun官方推荐配置为整个堆的3/8。 -Xss128k:设置每个线程的堆栈大小。JDK5.0以后每个线程堆栈大小为1M,以前每个线程堆栈大小为256K。更具应用的线程所需内存大小进行调整。在相同物理内存下,减小这个值能生成更多的线程。但是操作系统对一个进程内的线程数还是有限制的,不能无限生成,经验值在3000~5000左右。

  • java -Xmx3550m -Xms3550m -Xss128k -XX:NewRatio=4 -XX:SurvivorRatio=4 -XX:MaxPermSize=16m -XX:MaxTenuringThreshold=0 -XX:NewRatio=4:设置年轻代(包括Eden和两个Survivor区)与年老代的比值(除去持久代)。设置为4,则年轻代与年老代所占比值为1:4,年轻代占整个堆栈的1/5 -XX:SurvivorRatio=4:设置年轻代中Eden区与Survivor区的大小比值。设置为4,则两个Survivor区与一个Eden区的比值为2:4,一个Survivor区占整个年轻代的1/6 -XX:MaxPermSize=16m:设置持久代大小为16m。 -XX:MaxTenuringThreshold=0:设置垃圾最大年龄。如果设置为0的话,则年轻代对象不经过Survivor区,直接进入年老代。对于年老代比较多的应用,可以提高效率。如果将此值设置为一个较大值,则年轻代对象会在Survivor区进行多次复制,这样可以增加对象再年轻代的存活时间,增加在年轻代即被回收的概论。

回收器选择

JVM给了三种选择:串行收集器、并行收集器、并发收集器,但是串行收集器只适用于小数据量的情况,所以这里的选择主要针对并行收集器和并发收集器。默认情况下,JDK5.0以前都是使用串行收集器,如果想使用其他收集器需要在启动时加入相应参数。JDK5.0以后,JVM会根据当前系统配置进行判断。

  • java -Xmx3550m -Xms3550m -Xmn2g -Xss128k -XX:ParallelGCThreads=20 -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC:设置年老代为并发收集。测试中配置这个以后,-XX:NewRatio=4的配置失效了,原因不明。所以,此时年轻代大小最好用-Xmn设置。 -XX:+UseParNewGC:设置年轻代为并行收集。可与CMS收集同时使用。JDK5.0以上,JVM会根据系统配置自行设置,所以无需再设置此值。

  • java -Xmx3550m -Xms3550m -Xmn2g -Xss128k -XX:+UseConcMarkSweepGC -XX:CMSFullGCsBeforeCompaction=5 -XX:+UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction:由于并发收集器不对内存空间进行压缩、整理,所以运行一段时间以后会产生“碎片”,使得运行效率降低。此值设置运行多少次GC以后对内存空间进行压缩、整理。 -XX:+UseCMSCompactAtFullCollection:打开对年老代的压缩。可能会影响性能,但是可以消除碎片

  • java -Xmx3800m -Xms3800m -Xmn2g -Xss128k -XX:+UseParallelGC -XX:ParallelGCThreads=20 -XX:+UseParallelGC:选择垃圾收集器为并行收集器。此配置仅对年轻代有效。即上述配置下,年轻代使用并发收集,而年老代仍旧使用串行收集。 -XX:ParallelGCThreads=20:配置并行收集器的线程数,即:同时多少个线程一起进行垃圾回收。此值最好配置与处理器数目相等。

  • java -Xmx3550m -Xms3550m -Xmn2g -Xss128k -XX:+UseParallelGC -XX:ParallelGCThreads=20 -XX:+UseParallelOldGC -XX:+UseParallelOldGC:配置年老代垃圾收集方式为并行收集。JDK6.0支持对年老代并行收集。

  • java -Xmx3550m -Xms3550m -Xmn2g -Xss128k -XX:+UseParallelGC -XX:MaxGCPauseMillis=100 -XX:MaxGCPauseMillis=100:设置每次年轻代垃圾回收的最长时间,如果无法满足此时间,JVM会自动调整年轻代大小,以满足此值。

  • java -Xmx3550m -Xms3550m -Xmn2g -Xss128k -XX:+UseParallelGC -XX:MaxGCPauseMillis=100 -XX:+UseAdaptiveSizePolicy -XX:+UseAdaptiveSizePolicy:设置此选项后,并行收集器会自动选择年轻代区大小和相应的Survivor区比例,以达到目标系统规定的最低相应时间或者收集频率等,此值建议使用并行收集器时,一直打开。

    1. 吞吐量优先的并行收集器 如上文所述,并行收集器主要以到达一定的吞吐量为目标,适用于科学技术和后台处理等。
    2. 响应时间优先的并发收集器 如上文所述,并发收集器主要是保证系统的响应时间,减少垃圾收集时的停顿时间。适用于应用服务器、电信领域等。

辅助信息

JVM提供了大量命令行参数,打印信息,供调试使用。主要有以下一些:

  • -XX:+PrintGC 输出形式:

    [GC 118250K->113543K(130112K), 0.0094143 secs]
    [Full GC 121376K->10414K(130112K), 0.0650971 secs]
    
  • -XX:+PrintGCDetails 输出形式:

    [GC [DefNew: 8614K->781K(9088K), 0.0123035 secs] 118250K->113543K(130112K), 0.0124633 secs]
    [GC [DefNew: 8614K->8614K(9088K), 0.0000665 secs][Tenured: 112761K->10414K(121024K), 0.0433488 secs] 121376K->10414K(130112K), 0.0436268 secs]
    
  • -XX:+PrintGCTimeStamps -XX:+PrintGC:PrintGCTimeStamps可与上面两个混合使用 输出形式:

    [GC 98328K->93620K(130112K), 0.0082960 secs]
    
  • -XX:+PrintGCApplicationConcurrentTime:打印每次垃圾回收前,程序未中断的执行时间。可与上面混合使用 输出形式:

    Application time: 0.5291524 seconds
    
  • -XX:+PrintGCApplicationStoppedTime:打印垃圾回收期间程序暂停的时间。可与上面混合使用 输出形式:

    Total time for which application threads were stopped: 0.0468229 seconds
    
  • -XX:PrintHeapAtGC:打印GC前后的详细堆栈信息 输出形式:

    [GC {Heap before gc invocations=7:
    		 def new generation   total 55296K, used 52568K [0x1ebd0000, 0x227d0000, 0x227d0000)
    		eden space 49152K,  99% used [0x1ebd0000, 0x21bce430, 0x21bd0000)
    		from space 6144K,  55% used [0x221d0000, 0x22527e10, 0x227d0000)
    		  to   space 6144K,   0% used [0x21bd0000, 0x21bd0000, 0x221d0000)
    		 tenured generation   total 69632K, used 2696K [0x227d0000, 0x26bd0000, 0x26bd0000)
    		the space 69632K,   3% used [0x227d0000, 0x22a720f8, 0x22a72200, 0x26bd0000)
    		 compacting perm gen  total 8192K, used 2898K [0x26bd0000, 0x273d0000, 0x2abd0000)
    		   the space 8192K,  35% used [0x26bd0000, 0x26ea4ba8, 0x26ea4c00, 0x273d0000)
    		    ro space 8192K,  66% used [0x2abd0000, 0x2b12bcc0, 0x2b12be00, 0x2b3d0000)
    		    rw space 12288K,  46% used [0x2b3d0000, 0x2b972060, 0x2b972200, 0x2bfd0000)
    		34.735: [DefNew: 52568K->3433K(55296K), 0.0072126 secs] 55264K->6615K(124928K)Heap after gc invocations=8:
    		 def new generation   total 55296K, used 3433K [0x1ebd0000, 0x227d0000, 0x227d0000)
    		eden space 49152K,   0% used [0x1ebd0000, 0x1ebd0000, 0x21bd0000)
    		  from space 6144K,  55% used [0x21bd0000, 0x21f2a5e8, 0x221d0000)
    		  to   space 6144K,   0% used [0x221d0000, 0x221d0000, 0x227d0000)
    		 tenured generation   total 69632K, used 3182K [0x227d0000, 0x26bd0000, 0x26bd0000)
    		the space 69632K,   4% used [0x227d0000, 0x22aeb958, 0x22aeba00, 0x26bd0000)
    		 compacting perm gen  total 8192K, used 2898K [0x26bd0000, 0x273d0000, 0x2abd0000)
    		   the space 8192K,  35% used [0x26bd0000, 0x26ea4ba8, 0x26ea4c00, 0x273d0000)
    		    ro space 8192K,  66% used [0x2abd0000, 0x2b12bcc0, 0x2b12be00, 0x2b3d0000)
    		    rw space 12288K,  46% used [0x2b3d0000, 0x2b972060, 0x2b972200, 0x2bfd0000)
    		}
    		, 0.0757599 secs]
    
  • -Xloggc:filename:与上面几个配合使用,把相关日志信息记录到文件以便分析。

常见配置汇总

  • -XX:+CMSIncrementalMode:设置为增量模式。适用于单CPU情况。
  • -XX:ParallelGCThreads=n:设置并发收集器年轻代收集方式为并行收集时,使用的CPU数。并行收集线程数。
  • -XX:ParallelGCThreads=n:设置并行收集器收集时使用的CPU数。并行收集线程数。
  • -XX:MaxGCPauseMillis=n:设置并行收集最大暂停时间
  • -XX:GCTimeRatio=n:设置垃圾回收时间占程序运行时间的百分比。公式为1/(1+n)
  • -XX:+PrintGC
  • -XX:+PrintGCDetails
  • -XX:+PrintGCTimeStamps
  • -Xloggc:filename
  • -XX: +UseSerialGC:设置串行收集器
  • -XX: +UseParallelGC:设置并行收集器
  • -XX: +UseParalledlOldGC:设置并行年老代收集器
  • -XX: +UseConcMarkSweepGC:设置并发收集器
  • -Xms:初始堆大小
  • -Xmx:最大堆大小
  • -XX:NewSize=n:设置年轻代大小
  • -XX:NewRatio=n:设置年轻代和年老代的比值。如:为3,表示年轻代与年老代比值为1:3,年轻代占整个年轻代年老代和的1/4
  • -XX:SurvivorRatio=n:年轻代中Eden区与两个Survivor区的比值。注意Survivor区有两个。如:3,表示Eden:Survivor=3:2,一个Survivor区占整个年轻代的1/5
  • -XX:MaxPermSize=n:设置持久代大小 a. 堆设置 b. 收集器设置 c. 垃圾回收统计信息 d. 并行收集器设置 f. 并发收集器设置

调优总结

年轻代大小选择

  • 响应时间优先的应用:尽可能设大,直到接近系统的最低响应时间限制(根据实际情况选择)。在此种情况下,年轻代收集发生的频率也是最小的。同时,减少到达年老代的对象。
  • 吞吐量优先的应用:尽可能的设置大,可能到达Gbit的程度。因为对响应时间没有要求,垃圾收集可以并行进行,一般适合8CPU以上的应用。
  • 年老代大小选择
  • 并发垃圾收集信息
  • 持久代并发收集次数
  • 传统GC信息
  • 花在年轻代和年老代回收上的时间比例
  • 响应时间优先的应用:年老代使用并发收集器,所以其大小需要小心设置,一般要考虑并发会话率和会话持续时间等一些参数。如果堆设置小了,可以会造成内存碎片、高回收频率以及应用暂停而使用传统的标记清除方式;如果堆大了,则需要较长的收集时间。最优化的方案,一般需要参考以下数据获得:减少年轻代和年老代花费的时间,一般会提高应用的效率
  • 吞吐量优先的应用:一般吞吐量优先的应用都有一个很大的年轻代和一个较小的年老代。原因是,这样可以尽可能回收掉大部分短期对象,减少中期的对象,而年老代尽存放长期存活对象。
  • 较小堆引起的碎片问题 因为年老代的并发收集器使用标记、清除算法,所以不会对堆进行压缩。当收集器回收时,他会把相邻的空间进行合并,这样可以分配给较大的对象。但是,当堆空间较小时,运行一段时间以后,就会出现“碎片”,如果并发收集器找不到足够的空间,那么并发收集器将会停止,然后使用传统的标记、清除方式进行回收。如果出现“碎片”,可能需要进行如下配置:
  • -XX:+UseCMSCompactAtFullCollection:使用并发收集器时,开启对年老代的压缩。
  • -XX:CMSFullGCsBeforeCompaction=0:上面配置开启的情况下,这里设置多少次Full GC后,对年老代进行压缩

关于Axios的GET与DELETE用法注意事项

Published on:
Tags: Vue.js Axios

axios的接口定义如下

vue1

config定义如下:

vue2

因此,我们在使用get和delete时需要注意,这两个接口接收的第二个参数是config。用时,就需要区别对待,且需要与后台定义对应。

  1. 如果想参数在Query Parameter里面,那就用{params: params},后台那边会用RequestParam接收
  2. 如果想参数在Payload里面,那就用{data: params},后台那边会用RequestBody接收

如果后台不匹配,可能会抛ContentType错误的异常,如:

vue3

Trouble Shooting —— Redis AOF rewrite错误导致Redis被Block住

问题现状:

redis-cli 上去执行任何命令返回:connnection reset by peer

重启的应用无法连接到redis,已经建立连接的应用可以正常使用。

分析过程:

第一反应查看redis 日志,如下:

1838:M 16 Aug 01:07:39.319 # Error opening /setting AOF rewrite IPC pipes: Numerical result out of range
1838:M 16 Aug 01:07:39.319 * Starting automatic rewriting of AOF on 110% growth
1838:M 16 Aug 01:07:39.319 # Error opening /setting AOF rewrite IPC pipes: Numerical result out of range
1838:M 16 Aug 01:07:39.419 * Starting automatic rewriting of AOF on 110% growth
1838:M 16 Aug 01:07:39.419 # Error opening /setting AOF rewrite IPC pipes: Numerical result out of range
1838:M 16 Aug 01:07:39.441 # Error registering fd event for the new client: Numerical result out of range (fd=10311)
1838:M 16 Aug 01:07:39.457 # Error registering fd event for the new client: Numerical result out of range (fd=10311)
1838:M 16 Aug 01:07:39.457 # Error registering fd event for the new client: Numerical result out of range (fd=10311)
1838:M 16 Aug 01:07:39.461 # Error registering fd event for the new client: Numerical result out of range (fd=10311)
1838:M 16 Aug 01:07:39.461 # Error registering fd event for the new client: Numerical result out of range (fd=10311)
1838:M 16 Aug 01:07:39.462 # Error registering fd event for the new client: Numerical result out of range (fd=10311)

上面有两种错误日志

  • Error opening /setting AOF rewrite IPC pipes: Numerical result out of range
    • 写aof出错了,超限
  • Error registering fd event for the new client: Numerical result out of range (fd=10311)
    • 创建连接没有成功,能看到fd已经是10311 过万了

出现这种问题第一个先去看一下redis现在有多少个连接数

>netstat -anp|grep 6379
>499

查看redis.conf中配置maxclients没有配置,redis默认为10000 这个时候有个疑问?为什么netstat查看的连接数只有499,但是redis日志中已经过万( fd=10311)?这个问题值得思考? 我们通过查询进程的fd看一下具体打开了多少个连接(在linux中任何连接都是open file)

>ls -al /proc/1838/fd | grep socket | wc -l
>499
 
>ls -al /proc/1838/fd | wc -l
>10322

为什么fd中socket的只有499,所有类型的确是10322呢?通过具体查看发现有9823个全都是pipe类型的连接

>ls -al /proc/1838/fd | grep pipe | wc -l
>9823

为什么redis进程会有那么多pipe的连接呢? 难道是我们redis client使用的pipeline导致的连接泄漏?

于是查看了Jedis的源码

  /**
   * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to
   * get return values from pipelined commands, capture the different Response&lt;?&gt; of the
   * commands you execute.
   */
  public void sync() {
    if (getPipelinedResponseLength() > 0) {
      List<Object> unformatted = client.getAll();
      for (Object o : unformatted) {
        generateResponse(o);
      }
    }
  }

能看到注释中有描述调用这个方法会操作连接关闭:This operation close the pipeline

又询问了开发的同学我们目前没有使用到pipelined,因此排除了这个可能

那问题来了是什么原因导致的pipe连接过多?

网上兜了一圈没发现有价值的信息,没办法只能去扫redis源码,

accetpCommonHandler函数源码:

static void acceptCommonHandler(int fd, int flags) {  
    redisClient *c;  
    if ((c = createClient(fd)) == NULL) {  
        redisLog(REDIS_WARNING,  
            "Error registering fd event for the new client: %s (fd=%d)",  
            strerror(errno),fd);  
        close(fd); /* May be already closed, just ignore errors */  
        return;  
    }  
    /* If maxclient directive is set and this is one client more... close the 
     * connection. Note that we create the client instead to check before 
     * for this condition, since now the socket is already set in non-blocking 
     * mode and we can send an error for free using the Kernel I/O */  
    if (listLength(server.clients) > server.maxclients) {  
        char *err = "-ERR max number of clients reached\r\n";  
   
        /* That's a best effort error message, don't check write errors */  
        if (write(c->fd,err,strlen(err)) == -1) {  
            /* Nothing to do, Just to avoid the warning... */  
        }  
        server.stat_rejected_conn++;  
        freeClient(c);  
        return;  
    }  
    server.stat_numconnections++;  
    c->flags |= flags;  
}

ps.这个函数主要调用createClient初始化客户端相关数据结构以及对应的socket,初始化后会判断当前连接的客户端是否超过最大值,如果超过的话,会拒绝这次连接。否则,更新客户端连接数的计数。 数据结构redisClient用于表示一个客户端的连接,包括一个客户多次请求的状态,createClient函数主要是初始化这个数据结构。在createClient函数中,首先是创建redisClient,然后是设置socket的属性,然后添加该socket的读事件

createClient函数源码:

if (fd != -1) {  
    anetNonBlock(NULL,fd);  
    // <MM>  
    // 关闭Nagle算法,提升响应速度  
    // </MM>  
    anetEnableTcpNoDelay(NULL,fd);  
    if (server.tcpkeepalive)  
        anetKeepAlive(NULL,fd,server.tcpkeepalive);  
    if (aeCreateFileEvent(server.el,fd,AE_READABLE,  
        readQueryFromClient, c) == AE_ERR)  
    {  
        close(fd);  
        zfree(c);  
        return NULL;  
    }  
}

ps.将socket设置为非阻塞的并且no delay,关闭Nagle算法,提升响应速度。最后会注册socket的读事件,事件处理函数是readQueryFromClient,这个函数便是客户端请求的起点,之后会详细介绍。

createClient函数的最后部分,就是对redisClient的属性初始化,代码不再列出。

当从acceptTcpHandler返回后,客户端的连接就建立完毕,接下来就是等待客户端的请求。

以上就是这个错误涉及到的redis源码

在redis的github上发现了有类似的问题issue:https://github.com/antirez/redis/issues/2857

在源码aof.c文件中

/* Parent */    
server.stat_fork_time = ustime()-start;    
server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */    
latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);    
if (childpid == -1) {    
serverLog(LL_WARNING,    
"Can't rewrite append only file in background: fork: %s",    
strerror(errno));      
return C_ERR;    
}

源码发现在报出Can’t rewrite append only file in background: fork: %s这个错误的时候,没有关闭pipe连接

因此看到了redis官方的修复说明已经修复了这个问题,翻出github上的提交记录,如下

redis1

这个时候看到了希望

于是搜索日志寻找是否有上图的错误:Can’t rewrite append only file in background: fork

1838:M 15 Aug 13:52:01.101 # Can't rewrite append only file in background: fork: Cannot allocate memory
1838:M 15 Aug 13:52:01.202 * Starting automatic rewriting of AOF on 100% growth
1838:M 15 Aug 13:52:01.203 # Can't rewrite append only file in background: fork: Cannot allocate memory
1838:M 15 Aug 13:52:01.303 * Starting automatic rewriting of AOF on 100% growth
1838:M 15 Aug 13:52:01.304 # Can't rewrite append only file in background: fork: Cannot allocate memory

有很多我这里截取了前面的,总共出现的次数

>less redis.log.1 | grep "Can't rewrite append only file in background: fork" | wc -l
>1644

基本可以断定是这个问题引发的连锁反应,这个时候我们需要研究一下Redis AOF机制,最终确认是否是这个问题导致。

研究redis AOF机制

redis aof rewirte机制,自动触发bgrewritedaof的条件:

long long growth =(server.appendonly_current_size*100/base) - 100;
if (growth >=server.auto_aofrewrite_perc)

我们的配置文件配置的auto-aof-rewrite-percentage 为100,也就是说当写入日志文件文件大小超过上次rewrite之后的文件大小的百分之100的时候就触发rewrite(也就是超过2倍)

ps.rewrite之后aof文件会保存keys的最后的状态,清除掉之前冗余的,来缩小这个文件。

通过分析aof rewrite发现rewrite出错就是导致Redis连接数超过最大值的罪魁祸首。

分析总结:

基本可以定位到,这个错误是个连锁反应最终导致Redis服务出现问题 * 首先redis在进行aof的rewrite的时候,会检查机器可以用的内存够不够支撑做aof rewrite,这个时候我们机器的可用内存太小,因此报了如下错误

Can't rewrite append only file in background: fork: Cannot allocate memory
  • 但是rewirte自动触发机制当达到2倍的时候会一直触发,他就会一直尝试aof rewrite
  • 在aof rewrite尝试的过程中,已经创建的连接还是可以正常使用,这导致aof的auto_aofrewrite_perc一直在增长但是无法写入到aof文件中,因此又暴漏出另外一个错误,如下所示
1838:M 16 Aug 01:07:39.319 * Starting automatic rewriting of AOF on 110% growth
1838:M 16 Aug 01:07:39.319 # Error opening /setting AOF rewrite IPC pipes: Numerical result out of range
  • 当aof rewirte出错时,从redis代码也能看到,他没有调用close pipes管道连接,这个就造成了服务器上有大量连接被占用(pipe类型)
>netstat -anp|grep 6379
>499

>ls -al /proc/1838/fd | grep socket | wc -l
>499

>ls -al /proc/1838/fd | grep pipe | wc -l
>9823

>ls -al /proc/1838/fd | wc -l
>10322
  • 当连接到达maxclients 10000时就会拒绝新建连接,并且报如下错误
Error registering fd event for the new client: Numerical result out of range (fd=10311)

本次分析的结论

这个问题未解决需要继续跟踪,可能需要升级redis的版本,目前看到3.2.9以上才修复了这个bug,我们用的3.0.6版本的跨度有点大兼容性也需要考虑,还要对redis的配置在进一步研究,通过timeout配置让自动关闭无用的连接着也是一个解决问题的思路,这次只是先定位问题,具体解决还需要进一步研究

这个问题的issue:#2857#2883

这个问题的提交记录:fix #2883, #2857 pipe fds leak when fork() failed on bg aof rw

问题修改的文件:3.2.9分支 -> aof.c文件