定义一个队列,把数据放到队列中,代码如下:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.cloud.icenter.common.utils.SystemConfig;
/**
* 级联模块所有的接口回调,都要先将回调数据发送到队列当中
* 同事线程监控该队列,进场接口回调处理
* @author ynxiea
*
*/
public class CallbackUtil {
//组织机构回调
public static final String CALLBACK_ORGAN = "organCallback";
//资源分类回调
public static final String CALLBACK_CATEGORY = "categoryCallback";
//资源目录回调
public static final String CALLBACK_CATALOG = "catalogCallback";
//服务发布回调
public static final String CALLBACK_SERVICE = "serviceCallback";
//服务申请回调
public static final String CALLBACK_APPLY = "applyCallback";
//组织机构回调:类型=1
public static final String CALLBACK_TYPE_01 = "1";
//资源分类回调:类型=2
public static final String CALLBACK_TYPE_02 = "2";
//资源目录回调:类型=3
public static final String CALLBACK_TYPE_03 = "3";
//服务发布回调:类型=4
public static final String CALLBACK_TYPE_04 = "4";
//服务申请回调:类型=5
public static final String CALLBACK_TYPE_05 = "5";
public static ConcurrentLinkedQueue<Map<String, String>> queue = new ConcurrentLinkedQueue<Map<String, String>>();
//新增数据到队列当中
public static void addQueue(Map<String, String> jsonMap) {
//级联模块:1 = 代表湖北省,省市级联, 2 = 其他地市,不需要地市级联
String cascadeFlag = SystemConfig.getProperty("cascade.data");
if (cascadeFlag.equals("1")) {
queue.add(jsonMap);
}
}
//获取队列大小
public static int getQueueSize() {
return queue.size();
}
//获取队列的数据
public static Map<String, String> pullData() {
if (queue.size() > 0) {
return queue.poll();
}
return null;
}
public static void main(String[] args) {
Map<String, String> jsonMap1 = new HashMap<String, String>();
jsonMap1.put(CALLBACK_ORGAN, "json");
Map<String, String> jsonMap2 = new HashMap<String, String>();
jsonMap2.put(CALLBACK_CATEGORY, "categoryCallback");
Map<String, String> jsonMap3 = new HashMap<String, String>();
jsonMap3.put(CALLBACK_CATALOG, "catalogCallback");
Map<String, String> jsonMap4 = new HashMap<String, String>();
jsonMap4.put(CALLBACK_SERVICE, "serviceCallback");
Map<String, String> jsonMap5 = new HashMap<String, String>();
jsonMap5.put(CALLBACK_APPLY, "applyCallback");
addQueue(jsonMap1);
addQueue(jsonMap2);
addQueue(jsonMap3);
addQueue(jsonMap4);
addQueue(jsonMap5);
System.out.println("队列大小:"+getQueueSize());
for (int i=0;i<10;i++){
System.out.println(pullData());
System.out.println(getQueueSize());
}
}
}
使用线程,执行队列任务
/**
* 常驻线程:执行队列任务
* @author ynxiea
*
*/
public class CallbackThread extends Thread {
@Override
public void run() {
while (true) {
int size = CallbackUtil.getQueueSize();
if (size > 0) {
CallbackOperation.doExecute();
} else {
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
接口回调处理接口
public class CallbackOperation {
private static CallbackService callbackService;
static {
callbackService = SpringUtil.getBean(CallbackService.class);
}
//执行回调处理
public static void doExecute() {
int size = CallbackUtil.getQueueSize();
for (int i=0;i<size;i++) {
Map<String, String> jsonMap = CallbackUtil.pullData();
if (jsonMap != null && !jsonMap.isEmpty()) {
Set<String> sets = jsonMap.keySet();
for (String key : sets) {
if (CallbackUtil.CALLBACK_ORGAN.equals(key)) {
//组织机构回调
organCallback(jsonMap.get(key));
} else if (CallbackUtil.CALLBACK_CATEGORY.equals(key)) {
//资源分类回调
categoryCallback(jsonMap.get(key));
} else if (CallbackUtil.CALLBACK_CATALOG.equals(key)) {
//资源目录回调
catalogCallback(jsonMap.get(key));
} else if (CallbackUtil.CALLBACK_SERVICE.equals(key)) {
//服务发布回调
serviceCallback(jsonMap.get(key));
} else if (CallbackUtil.CALLBACK_APPLY.equals(key)) {
//服务申请回调
applyCallback(jsonMap.get(key));
} else {
//其他回调接口,待处理
}
}
}
}
}
//发送http请求
private static void sendHttpData(List<Callback> list, String json) {
if (list != null && list.size()>0) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
//发送数据:先组装成map对象
Map<String, String> jsonData = new HashMap<String, String>();
//时间戳
jsonData.put("timestamp", sdf.format(new Date()));
//sign生成待定 String sign = SignGeneration.generationSign(params, sk);
jsonData.put("sign", "");
//请求数据
jsonData.put("data", json);
for (Callback c: list) {
Map<String, String> mapParams = null;
//接口请求参数
String requestParams = c.getRequestParams();
if (!StringUtil.isEmpty(requestParams)) {
mapParams = CascadeJsonUtil.toObject(requestParams, Map.class);
}
if (mapParams!= null && !mapParams.isEmpty()) {
jsonData.putAll(mapParams);
}
String sendJson = CascadeJsonUtil.toJson(jsonData);
try {
Map<String,Object> result = CascadeHttpUtil.sendPost(c.getUrlAddress(), sendJson);
System.out.println(result);
//记录日志
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
//组织机构回调
private static void organCallback(String json) {
List<Callback> list = callbackService.queryByType(CallbackUtil.CALLBACK_TYPE_01);
sendHttpData(list, json);
}
//资源分类回调
private static void categoryCallback(String json) {
List<Callback> list = callbackService.queryByType(CallbackUtil.CALLBACK_TYPE_02);
List<Callback> result = new ArrayList<Callback>();
Map<String, Object> jsonData = CascadeJsonUtil.toObject(json, Map.class);
//判断传递参数是否存在:organId,如果存在,代表地市的资源分类变化,回调时过滤该组织机构
if (jsonData.containsKey("organId")) {
String organId = String.valueOf(jsonData.get("organId"));
for(Callback c : list) {
if (!c.getOrganId().equals(organId)) {
result.add(c);
}
}
} else {
result.addAll(list);
}
sendHttpData(result, json);
}
//资源目录回调
private static void catalogCallback(String json) {
List<Callback> list = callbackService.queryByType(CallbackUtil.CALLBACK_TYPE_03);
List<Callback> result = new ArrayList<Callback>();
Map<String, Object> jsonData = CascadeJsonUtil.toObject(json, Map.class);
//判断传递参数是否存在:organId,如果存在,代表地市的资源目录变化,回调时过滤该组织机构
if (jsonData.containsKey("organId")) {
String organId = String.valueOf(jsonData.get("organId"));
for(Callback c : list) {
if (!c.getOrganId().equals(organId)) {
result.add(c);
}
}
} else {
result.addAll(list);
}
sendHttpData(result, json);
}
//服务发布回调
private static void serviceCallback(String json) {
List<Callback> list = callbackService.queryByType(CallbackUtil.CALLBACK_TYPE_04);
sendHttpData(list, json);
}
//服务申请回调
private static void applyCallback(String json) {
List<Callback> list = callbackService.queryByType(CallbackUtil.CALLBACK_TYPE_05);
sendHttpData(list, json);
}
}
