写在前面
生产上在高并发使用java的UpdateByQueryRequest对es进行update的时候(与java线程无关,UpdateByQueryRequest内部本身就是异步请求)会出现一个情况:第一次更新成功,后续短时间内的更新条数都是0
分析
后来根据查阅资料发现因为如下原因:
es内部为了防止出现并发操作的脏写情况出现,使用了_version的乐观锁控制。
_version元数据
第一次创建一个document的时候,它的_version内部版本号就是1;以后,每次对这个document执行修改或者删除操作,都会对
这个_version版本号自动加1;哪怕是删除
在删除一个document后,他不是立即物理删除掉的,因为它的一些版本号等信息还是保留的,先删除一条document,再重新创建
这条document,其实会在delete version基础之上,再把version+1
我们可以使用如下代码进行测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| @Test public void EsVersionTest() throws Exception {
String indexName=EsIndexConstants.IiapIndexConsts.IMPL_DEVICE_INDEX; BoolQueryBuilder idQuery = QueryBuilders.boolQuery(); idQuery.must(QueryBuilders.termQuery("deviceId", "11111111"));
Map<String,Long> map=new HashMap<>();
Thread t3=new Thread(new Runnable() { @Override public void run() { Script script2= new Script("ctx._source.deleted='"+1+"'" + ";ctx._source.updatedTime='"+System.currentTimeMillis()+"'"+ ";ctx._source.updatedBy='canalTest3'"); UpdateByQueryRequestBuilder updateByQuery2 = UpdateByQueryAction.INSTANCE.newRequestBuilder(elasticsearchTemplate.getClient()); updateByQuery2.source(indexName).abortOnVersionConflict(false) .filter(idQuery) .script(script2); long updated2 = updateByQuery2.get().getUpdated(); map.put("t3",updated2); } },"t3");
Thread t4=new Thread(new Runnable() { @Override public void run() { Script script= new Script("ctx._source.deleted='"+1+"'" + ";ctx._source.updatedTime='"+System.currentTimeMillis()+"'"+ ";ctx._source.updatedBy='canalTest4'"); UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(elasticsearchTemplate.getClient()); updateByQuery.source(indexName).abortOnVersionConflict(false) .filter(idQuery) .script(script); long updated = updateByQuery.get().getUpdated(); map.put("t4",updated); } },"t4");
t3.start(); t4.start(); t3.join(); t4.join(); log.info("map 2 :{}",map); log.info("ssssssssssss"); }
@Test public void EsVersionTest2() throws Exception {
String indexName=EsIndexConstants.IiapIndexConsts.IMPL_DEVICE_INDEX; BoolQueryBuilder idQuery = QueryBuilders.boolQuery(); idQuery.must(QueryBuilders.termQuery("deviceId", "1111111"));
Map<String,Long> map=new HashMap<>(); for(int i=0;i<100;i++){ Map<String,Object> params=new HashMap<>(); params.put("deleted","1"); params.put("updatedTime",Long.toString(System.currentTimeMillis())); params.put("updatedBy","T"+i); Script script2= new Script(ScriptType.INLINE, "painless","ctx._source.deleted=params.deleted" + ";ctx._source.updatedTime=params.updatedTime"+ ";ctx._source.updatedBy=params.updatedBy",params); UpdateByQueryRequestBuilder updateByQuery2 = UpdateByQueryAction.INSTANCE.newRequestBuilder(elasticsearchTemplate.getClient()); updateByQuery2.source(indexName) .filter(idQuery) .script(script2); long updated2 = updateByQuery2.get().getUpdated(); map.put("t3",updated2);
if(updated2==0){ DeviceThreadExecutor.DeviceHandler deviceHandler=new DeviceThreadExecutor.DeviceHandler(); deviceHandler.setUpdateByQueryRequestBuilder(updateByQuery2); deviceHandler.setUpdated(0L); DeviceThreadExecutor.runDeviceHandler(deviceHandler); }
log.info("map 2 :{}",map); } log.info("sssssssssssss"); }
|
执行后可以看到,第一次update成功后,后续的更新都没有成功。
未完待续…