1 #ifndef ISPN_HOTROD_REMOTECACHEBASE_H
2 #define ISPN_HOTROD_REMOTECACHEBASE_H
23 using namespace org::infinispan::query::remote::client;
24 using namespace infinispan::hotrod::event;
26 namespace infinispan {
29 namespace operations {
30 class OperationsFactory;
40 typedef void* (*UnmarshallHelperFn) (
void*,
const std::vector<char> &);
45 class KeyUnmarshallerFtor;
46 class ValueUnmarshallerFtor;
47 class RemoteCacheImpl;
52 virtual ~RemoteCacheBase(){}
60 HR_EXTERN const std::string& base_getNameAsString();
61 HR_EXTERN void *base_get(
const void *key, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
62 HR_EXTERN std::map<std::vector<char>,std::vector<char> > base_getAll(
const std::set<std::vector<char> >& keySet);
63 HR_EXTERN void *base_put(
const void *key,
const void *value, int64_t life, int64_t idle, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
64 HR_EXTERN void base_putAll(
const std::map<const void*, const void*>& map, int64_t life, int64_t idle, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
65 HR_EXTERN void *base_putIfAbsent(
const void *key,
const void *value, int64_t life, int64_t idle, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
66 HR_EXTERN void *base_replace(
const void *key,
const void *value, int64_t life, int64_t idle, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
67 HR_EXTERN void *base_remove(
const void *key, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
68 HR_EXTERN bool base_containsKey(
const void *key, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
70 HR_EXTERN bool base_replaceWithVersion(
const void *key,
const void *value, int64_t version, int64_t life, int64_t idle, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
71 HR_EXTERN bool base_removeWithVersion(
const void *key, int64_t version, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
72 HR_EXTERN void *base_getWithVersion(
const void* key,
VersionedValue* version, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
73 HR_EXTERN void *base_getWithMetadata(
const void* key,
MetadataValue* metadata, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
74 HR_EXTERN void base_getBulk(
int size, std::map<void*, void*> &mbuf, std::shared_ptr<Transaction> = std::shared_ptr<Transaction>());
75 HR_EXTERN void base_keySet(
int scope, std::vector<void*> &sbuf);
76 HR_EXTERN void base_stats(std::map<std::string,std::string> &sbuf);
80 HR_EXTERN std::vector<unsigned char> base_execute(
const std::string &cmdName,
const std::map<std::string,std::string>& args);
81 HR_EXTERN std::vector<unsigned char> base_execute(
const std::string &cmdName,
const std::map<std::vector<char> ,std::vector<char> >& args);
82 HR_EXTERN std::vector<char> base_execute(
const std::vector<char> &cmdName,
const std::map<std::vector<char> ,std::vector<char> >& args);
83 HR_EXTERN CacheTopologyInfo base_getCacheTopologyInfo();
85 HR_EXTERN std::vector<unsigned char> base_query_char(std::vector<unsigned char> qr,
size_t size);
87 HR_EXTERN void base_addClientListener(
ClientListener &clientListener,
const std::vector<std::vector<char> > filterFactoryParam,
const std::vector<std::vector<char> > converterFactoryParams,
const std::function<
void()> &recoveryCallback);
89 HR_EXTERN void putScript(
const std::vector<char>& name,
const std::vector<char>& script);
91 HR_EXTERN RemoteCacheBase(
TransactionManager& tm,
TransactionTable& tt,
bool forceReturnValue,
bool transactional) : transactionManager(tm), transactionTable(tt), forceReturnValue(forceReturnValue), transactional(transactional) {}
96 HR_EXTERN void setRemoteCachePtr(
void* ptr);
98 template<
class K,
class V,
typename... Params>
100 static char CONTINUOUS_QUERY_FILTER_FACTORY_NAME[] =
101 "continuous-query-filter-converter-factory";
102 cql.cl.filterFactoryName = std::vector<char>(
103 CONTINUOUS_QUERY_FILTER_FACTORY_NAME,
104 CONTINUOUS_QUERY_FILTER_FACTORY_NAME
105 + strlen(CONTINUOUS_QUERY_FILTER_FACTORY_NAME));
106 cql.cl.converterFactoryName = std::vector<char>(
107 CONTINUOUS_QUERY_FILTER_FACTORY_NAME,
108 CONTINUOUS_QUERY_FILTER_FACTORY_NAME
109 + strlen(CONTINUOUS_QUERY_FILTER_FACTORY_NAME));
110 BasicTypesProtoStreamMarshaller<std::string> paramMarshaller;
111 std::vector<std::vector<char> > filterFactoryParams;
112 std::vector<char> param;
113 paramMarshaller.marshall(cql.getQuery(), param);
114 filterFactoryParams.push_back(param);
115 std::vector<std::vector<char> > converterFactoryParams;
116 cql.cl.useRawData =
true;
117 cql.listenerCustomEvent =
119 ContinuousQueryResult r;
121 wm.ParseFromArray(e.getEventData().data(), e.getEventData().size());
122 r.ParseFromString(wm.wrappedmessagebytes());
123 auto resultType = r.resulttype();
125 std::tuple<Params...> tp=
popTuple<Params...>(r.projection(), i) ;
126 K* k = (K*)this->baseKeyUnmarshall(std::vector<char>(r.key().begin(), r.key().end()));
127 switch (resultType) {
128 case ContinuousQueryResult_ResultType_JOINING:
129 cql.getJoiningListener()(*k, tp);
131 case ContinuousQueryResult_ResultType_LEAVING:
132 cql.getLeavingListener()(*k, tp);
134 case ContinuousQueryResult_ResultType_UPDATED:
135 cql.getUpdatedListener()(*k, tp);
142 cql.cl.add_listener(cql.listenerCustomEvent);
143 this->base_addClientListener(cql.cl, filterFactoryParams,
144 converterFactoryParams, cql.getFailoverListener());
146 template <
class K,
class V>
148 static char CONTINUOUS_QUERY_FILTER_FACTORY_NAME[] =
149 "continuous-query-filter-converter-factory";
150 cql.cl.filterFactoryName = std::vector<char>(
151 CONTINUOUS_QUERY_FILTER_FACTORY_NAME,
152 CONTINUOUS_QUERY_FILTER_FACTORY_NAME
153 + strlen(CONTINUOUS_QUERY_FILTER_FACTORY_NAME));
154 cql.cl.converterFactoryName = std::vector<char>(
155 CONTINUOUS_QUERY_FILTER_FACTORY_NAME,
156 CONTINUOUS_QUERY_FILTER_FACTORY_NAME
157 + strlen(CONTINUOUS_QUERY_FILTER_FACTORY_NAME));
158 BasicTypesProtoStreamMarshaller<std::string> paramMarshaller;
159 std::vector<std::vector<char> > filterFactoryParams;
160 std::vector<char> param;
161 paramMarshaller.marshall(cql.getQuery(), param);
162 filterFactoryParams.push_back(param);
163 std::vector<std::vector<char> > converterFactoryParams;
164 cql.cl.useRawData =
true;
165 cql.listenerCustomEvent =
167 ContinuousQueryResult r;
169 wm.ParseFromArray(e.getEventData().data(), e.getEventData().size());
170 r.ParseFromString(wm.wrappedmessagebytes());
171 auto resultType = r.resulttype();
172 K* k = (K*)this->baseKeyUnmarshall(std::vector<char>(r.key().begin(), r.key().end()));
173 V* v = (V*)this->baseValueUnmarshall(std::vector<char>(r.value().begin(), r.value().end()));
174 switch (resultType) {
175 case ContinuousQueryResult_ResultType_JOINING:
176 cql.getJoiningListener()(*k, *v);
178 case ContinuousQueryResult_ResultType_LEAVING:
179 cql.getLeavingListener()(*k, *v);
181 case ContinuousQueryResult_ResultType_UPDATED:
182 cql.getUpdatedListener()(*k, *v);
189 cql.cl.add_listener(cql.listenerCustomEvent);
190 this->base_addClientListener(cql.cl, filterFactoryParams,
191 converterFactoryParams, cql.getFailoverListener());
196 this->valueCopyConstructor = valueCopyConstrunctor;
201 this->valueDestructor = valueDestructor;
206 this->valueMarshallerFn = valueMarshaller;
215 void *remoteCachePtr=
nullptr;
216 bool forceReturnValue;
220 std::shared_ptr<RemoteCacheImpl> impl;
223 HR_EXTERN void baseKeyMarshall(
const void* k, std::vector<char> &buf);
224 HR_EXTERN void baseValueMarshall(
const void* v, std::vector<char> &buf);
228 HR_EXTERN void* baseKeyUnmarshall(
const std::vector<char> &buf);
229 HR_EXTERN void* baseValueUnmarshall(
const std::vector<char> &buf);
230 void* transactional_base_get(
Transaction& currentTransaction,
const void* key);
231 void* transactional_base_put(
Transaction& currentTransaction,
const void* key,
const void* val, int64_t life,
232 int64_t idle,
bool forceRV);
236 friend class RemoteCacheImpl;
237 friend class NearRemoteCacheImpl;
238 friend class KeyUnmarshallerFtor;
239 friend class ValueUnmarshallerFtor;
240 template <
class K,
class V>
241 friend class ::infinispan::hotrod::event::CacheClientListener;
243 template <
typename... Params>
244 friend class ::infinispan::hotrod::event::ContinuousQueryListener;
Definition: Transactions.h:21
Definition: Transactions.h:174
void(* MarshallHelperFn)(void *, const void *, std::vector< char > &)
Definition: RemoteCacheBase.h:39
Definition: Transactions.h:73
#define HR_EXTERN
Definition: ImportExport.h:35
std::function< void(const void *, std::vector< char > &)> ValueMarshallerHelperFn
Definition: RemoteCacheBase.h:43
Definition: ClientEvent.h:145
Definition: TransactionManager.h:24
std::function< void *(const void *)> ValueCopyConstructHelperFn
Definition: RemoteCacheBase.h:41
Definition: DataFormat.h:79
std::tuple< H, Params...> popTuple(const RepeatedPtrField< WrappedMessage > &wMsgs, int &k)
Definition: QueryUtils.h:94
Definition: RemoteCacheManager.h:43
Definition: ClientListener.h:35
void *(* UnmarshallHelperFn)(void *, const std::vector< char > &)
Definition: RemoteCacheBase.h:40
Definition: CacheClientListener.h:33
std::function< void(const void *)> ValueDestructorHelperFn
Definition: RemoteCacheBase.h:42
Definition: ContinuousQueryListener.h:34
Definition: Transactions.h:130
Definition: RemoteExecution.h:29
Definition: VersionedValue.h:9