Fork me on GitHub

使用线程监听队列

定义一个队列,把数据放到队列中,代码如下:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;

import com.cloud.icenter.common.utils.SystemConfig;

/**
 * 级联模块所有的接口回调,都要先将回调数据发送到队列当中
 * 同事线程监控该队列,进场接口回调处理
 * @author ynxiea
 *
 */
public class CallbackUtil {

    //组织机构回调
    public static final String CALLBACK_ORGAN = "organCallback";
    //资源分类回调
    public static final String CALLBACK_CATEGORY = "categoryCallback";
    //资源目录回调
    public static final String CALLBACK_CATALOG = "catalogCallback";
    //服务发布回调
    public static final String CALLBACK_SERVICE = "serviceCallback";
    //服务申请回调
    public static final String CALLBACK_APPLY = "applyCallback";

    //组织机构回调:类型=1
    public static final String CALLBACK_TYPE_01 = "1";
    //资源分类回调:类型=2
    public static final String CALLBACK_TYPE_02 = "2";
    //资源目录回调:类型=3
    public static final String CALLBACK_TYPE_03 = "3";
    //服务发布回调:类型=4
    public static final String CALLBACK_TYPE_04 = "4";
    //服务申请回调:类型=5
    public static final String CALLBACK_TYPE_05 = "5";

    public static ConcurrentLinkedQueue<Map<String, String>> queue = new ConcurrentLinkedQueue<Map<String, String>>();

    //新增数据到队列当中
    public static void addQueue(Map<String, String> jsonMap) {
        //级联模块:1 = 代表湖北省,省市级联,  2 = 其他地市,不需要地市级联
        String cascadeFlag = SystemConfig.getProperty("cascade.data");
        if (cascadeFlag.equals("1")) {
            queue.add(jsonMap);
        }
    }

    //获取队列大小
    public static int getQueueSize() {
        return queue.size();
    }

    //获取队列的数据
    public static Map<String, String> pullData() {
        if (queue.size() > 0) {
            return queue.poll();
        }
        return null;
    }

    public static void main(String[] args) {
        Map<String, String> jsonMap1 = new HashMap<String, String>();
        jsonMap1.put(CALLBACK_ORGAN, "json");

        Map<String, String> jsonMap2 = new HashMap<String, String>();
        jsonMap2.put(CALLBACK_CATEGORY, "categoryCallback");

        Map<String, String> jsonMap3 = new HashMap<String, String>();
        jsonMap3.put(CALLBACK_CATALOG, "catalogCallback");

        Map<String, String> jsonMap4 = new HashMap<String, String>();
        jsonMap4.put(CALLBACK_SERVICE, "serviceCallback");

        Map<String, String> jsonMap5 = new HashMap<String, String>();
        jsonMap5.put(CALLBACK_APPLY, "applyCallback");

        addQueue(jsonMap1);
        addQueue(jsonMap2);
        addQueue(jsonMap3);
        addQueue(jsonMap4);
        addQueue(jsonMap5);

        System.out.println("队列大小:"+getQueueSize());

        for (int i=0;i<10;i++){
            System.out.println(pullData());
            System.out.println(getQueueSize());
        }

    }

}

使用线程,执行队列任务

/**
 * 常驻线程:执行队列任务
 * @author ynxiea
 *
 */
public class CallbackThread extends Thread {

    @Override
    public void run() {
        while (true) {
            int size = CallbackUtil.getQueueSize();
            if (size > 0) {
                CallbackOperation.doExecute();
            } else {
                try {
                    Thread.sleep(5 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

接口回调处理接口

public class CallbackOperation {

    private static CallbackService callbackService;

    static {
        callbackService = SpringUtil.getBean(CallbackService.class);
    }

    //执行回调处理
    public static void doExecute() {
        int size = CallbackUtil.getQueueSize();
        for (int i=0;i<size;i++) {
            Map<String, String> jsonMap = CallbackUtil.pullData();
            if (jsonMap != null && !jsonMap.isEmpty()) {
                Set<String> sets = jsonMap.keySet();
                for (String key : sets) {
                    if (CallbackUtil.CALLBACK_ORGAN.equals(key)) {
                        //组织机构回调
                        organCallback(jsonMap.get(key));
                    } else if (CallbackUtil.CALLBACK_CATEGORY.equals(key)) {
                        //资源分类回调
                        categoryCallback(jsonMap.get(key));
                    } else if (CallbackUtil.CALLBACK_CATALOG.equals(key)) {
                        //资源目录回调
                        catalogCallback(jsonMap.get(key));
                    } else if (CallbackUtil.CALLBACK_SERVICE.equals(key)) {
                        //服务发布回调
                        serviceCallback(jsonMap.get(key));
                    } else if (CallbackUtil.CALLBACK_APPLY.equals(key)) {
                        //服务申请回调
                        applyCallback(jsonMap.get(key));
                    } else {
                        //其他回调接口,待处理
                    }
                }
            }
        }
    }

    //发送http请求
    private static void sendHttpData(List<Callback> list, String json) {
        if (list != null && list.size()>0) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");

            //发送数据:先组装成map对象
            Map<String, String> jsonData = new HashMap<String, String>();
            //时间戳
            jsonData.put("timestamp", sdf.format(new Date()));

            //sign生成待定 String sign = SignGeneration.generationSign(params, sk);
            jsonData.put("sign", "");
            //请求数据
            jsonData.put("data", json);

            for (Callback c: list) {
                Map<String, String> mapParams = null;
                //接口请求参数
                String requestParams = c.getRequestParams();
                if (!StringUtil.isEmpty(requestParams)) {
                    mapParams = CascadeJsonUtil.toObject(requestParams, Map.class);
                }
                if (mapParams!= null && !mapParams.isEmpty()) {
                    jsonData.putAll(mapParams);
                }
                String sendJson =  CascadeJsonUtil.toJson(jsonData);
                try {
                    Map<String,Object> result = CascadeHttpUtil.sendPost(c.getUrlAddress(), sendJson);
                    System.out.println(result);
                    //记录日志
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //组织机构回调
    private static void organCallback(String json) {
        List<Callback> list = callbackService.queryByType(CallbackUtil.CALLBACK_TYPE_01);
        sendHttpData(list, json);
    }

    //资源分类回调
    private static void categoryCallback(String json) {
        List<Callback> list = callbackService.queryByType(CallbackUtil.CALLBACK_TYPE_02);
        List<Callback> result = new ArrayList<Callback>();
        Map<String, Object> jsonData = CascadeJsonUtil.toObject(json, Map.class);
        //判断传递参数是否存在:organId,如果存在,代表地市的资源分类变化,回调时过滤该组织机构
        if (jsonData.containsKey("organId")) {
            String organId = String.valueOf(jsonData.get("organId"));
            for(Callback c : list) {
                if (!c.getOrganId().equals(organId)) {
                    result.add(c);
                }
            }
        } else {
            result.addAll(list);
        }
        sendHttpData(result, json);
    }

    //资源目录回调
    private static void catalogCallback(String json) {
        List<Callback> list = callbackService.queryByType(CallbackUtil.CALLBACK_TYPE_03);
        List<Callback> result = new ArrayList<Callback>();
        Map<String, Object> jsonData = CascadeJsonUtil.toObject(json, Map.class);
        //判断传递参数是否存在:organId,如果存在,代表地市的资源目录变化,回调时过滤该组织机构
        if (jsonData.containsKey("organId")) {
            String organId = String.valueOf(jsonData.get("organId"));
            for(Callback c : list) {
                if (!c.getOrganId().equals(organId)) {
                    result.add(c);
                }
            }
        } else {
            result.addAll(list);
        }
        sendHttpData(result, json);
    }

    //服务发布回调
    private static void serviceCallback(String json) {
        List<Callback> list = callbackService.queryByType(CallbackUtil.CALLBACK_TYPE_04);
        sendHttpData(list, json);
    }

    //服务申请回调
    private static void applyCallback(String json) {
        List<Callback> list = callbackService.queryByType(CallbackUtil.CALLBACK_TYPE_05);
        sendHttpData(list, json);
    }

}