Infinispan HotRod C++ Client  8.3.1.Final
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
RemoteCacheBase.h
Go to the documentation of this file.
1 #ifndef ISPN_HOTROD_REMOTECACHEBASE_H
2 #define ISPN_HOTROD_REMOTECACHEBASE_H
3 
4 
5 
11 #include "infinispan/hotrod/Flag.h"
18 #include <map>
19 #include <set>
20 #include <vector>
21 #include <future>
22 
23 using namespace org::infinispan::query::remote::client;
24 using namespace infinispan::hotrod::event;
25 
26 namespace infinispan {
27 namespace hotrod {
28 
29 namespace operations {
30 class OperationsFactory;
31 }
32 
33 namespace event {
34 template <class K, class V> class CacheClientListener;
35 #ifndef SWIGCSHARP
36 template <typename... Params> class ContinuousQueryListener;
37 #endif
38 }
39 typedef void (*MarshallHelperFn) (void*, const void*, std::vector<char> &);
40 typedef void* (*UnmarshallHelperFn) (void*, const std::vector<char> &);
41 typedef std::function<void* (const void *)> ValueCopyConstructHelperFn;
42 typedef std::function<void (const void *)> ValueDestructorHelperFn;
43 typedef std::function<void (const void *, std::vector<char> &)> ValueMarshallerHelperFn;
44 
45 class KeyUnmarshallerFtor;
46 class ValueUnmarshallerFtor;
47 class RemoteCacheImpl;
48 
49 class RemoteCacheBase
50 {
51 public:
52  virtual ~RemoteCacheBase(){}
53  HR_EXTERN uint32_t base_prepareCommit(XID xid, TransactionContext& tctx);
54  HR_EXTERN uint32_t base_commit(XID xid, TransactionContext& tctx);
55  HR_EXTERN uint32_t base_rollback(XID xid, TransactionContext& tctx);
56 
57 
58 protected:
59  HR_EXTERN const char *base_getName();
60  HR_EXTERN const std::string& base_getNameAsString();
61  HR_EXTERN void *base_get(const void *key, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
62  HR_EXTERN std::map<std::vector<char>,std::vector<char> > base_getAll(const std::set<std::vector<char> >& keySet);
63  HR_EXTERN void *base_put(const void *key, const void *value, int64_t life, int64_t idle, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
64  HR_EXTERN void base_putAll(const std::map<const void*, const void*>& map, int64_t life, int64_t idle, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
65  HR_EXTERN void *base_putIfAbsent(const void *key, const void *value, int64_t life, int64_t idle, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
66  HR_EXTERN void *base_replace(const void *key, const void *value, int64_t life, int64_t idle, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
67  HR_EXTERN void *base_remove(const void *key, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
68  HR_EXTERN bool base_containsKey(const void *key, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
69  HR_EXTERN void base_ping();
70  HR_EXTERN bool base_replaceWithVersion(const void *key, const void *value, int64_t version, int64_t life, int64_t idle, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
71  HR_EXTERN bool base_removeWithVersion(const void *key, int64_t version, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
72  HR_EXTERN void *base_getWithVersion(const void* key, VersionedValue* version, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
73  HR_EXTERN void *base_getWithMetadata(const void* key, MetadataValue* metadata, std::shared_ptr<Transaction> currentTxPtr = std::shared_ptr<Transaction>());
74  HR_EXTERN void base_getBulk(int size, std::map<void*, void*> &mbuf, std::shared_ptr<Transaction> = std::shared_ptr<Transaction>());
75  HR_EXTERN void base_keySet(int scope, std::vector<void*> &sbuf);
76  HR_EXTERN void base_stats(std::map<std::string,std::string> &sbuf);
77  HR_EXTERN void base_clear();
78  HR_EXTERN uint64_t base_size();
79  HR_EXTERN void base_withFlags(Flag flag);
80  HR_EXTERN std::vector<unsigned char> base_execute(const std::string &cmdName, const std::map<std::string,std::string>& args);
81  HR_EXTERN std::vector<unsigned char> base_execute(const std::string &cmdName, const std::map<std::vector<char> ,std::vector<char> >& args);
82  HR_EXTERN std::vector<char> base_execute(const std::vector<char> &cmdName, const std::map<std::vector<char> ,std::vector<char> >& args);
83  HR_EXTERN CacheTopologyInfo base_getCacheTopologyInfo();
84  HR_EXTERN QueryResponse base_query(const QueryRequest &qr);
85  HR_EXTERN std::vector<unsigned char> base_query_char(std::vector<unsigned char> qr, size_t size);
86 
87  HR_EXTERN void base_addClientListener(ClientListener &clientListener, const std::vector<std::vector<char> > filterFactoryParam, const std::vector<std::vector<char> > converterFactoryParams, const std::function<void()> &recoveryCallback);
88  HR_EXTERN void base_removeClientListener(ClientListener &clientListener);
89  HR_EXTERN void putScript(const std::vector<char>& name, const std::vector<char>& script);
90 
91  HR_EXTERN RemoteCacheBase(TransactionManager& tm, TransactionTable& tt, bool forceReturnValue, bool transactional) : transactionManager(tm), transactionTable(tt), forceReturnValue(forceReturnValue), transactional(transactional) {}
92 
93 
94  HR_EXTERN void setMarshallers(void* rc, MarshallHelperFn kf, MarshallHelperFn vf, UnmarshallHelperFn ukf,
95  UnmarshallHelperFn uvf);
96  HR_EXTERN void setRemoteCachePtr(void* ptr);
97 #ifndef SWIGCSHARP
98  template<class K, class V, typename... Params>
99  void base_addContinuousQueryListener(ContinuousQueryListener<K, V, Params...>& cql) {
100  static char CONTINUOUS_QUERY_FILTER_FACTORY_NAME[] =
101  "continuous-query-filter-converter-factory";
102  cql.cl.filterFactoryName = std::vector<char>(
103  CONTINUOUS_QUERY_FILTER_FACTORY_NAME,
104  CONTINUOUS_QUERY_FILTER_FACTORY_NAME
105  + strlen(CONTINUOUS_QUERY_FILTER_FACTORY_NAME));
106  cql.cl.converterFactoryName = std::vector<char>(
107  CONTINUOUS_QUERY_FILTER_FACTORY_NAME,
108  CONTINUOUS_QUERY_FILTER_FACTORY_NAME
109  + strlen(CONTINUOUS_QUERY_FILTER_FACTORY_NAME));
110  BasicTypesProtoStreamMarshaller<std::string> paramMarshaller;
111  std::vector<std::vector<char> > filterFactoryParams;
112  std::vector<char> param;
113  paramMarshaller.marshall(cql.getQuery(), param);
114  filterFactoryParams.push_back(param);
115  std::vector<std::vector<char> > converterFactoryParams;
116  cql.cl.useRawData = true;
117  cql.listenerCustomEvent =
118  [this, &cql](ClientCacheEntryCustomEvent e) {
119  ContinuousQueryResult r;
120  WrappedMessage wm;
121  wm.ParseFromArray(e.getEventData().data(), e.getEventData().size());
122  r.ParseFromString(wm.wrappedmessagebytes());
123  auto resultType = r.resulttype();
124  auto i=0;
125  std::tuple<Params...> tp= popTuple<Params...>(r.projection(), i) ;
126  K* k = (K*)this->baseKeyUnmarshall(std::vector<char>(r.key().begin(), r.key().end()));
127  switch (resultType) {
128  case ContinuousQueryResult_ResultType_JOINING:
129  cql.getJoiningListener()(*k, tp);
130  break;
131  case ContinuousQueryResult_ResultType_LEAVING:
132  cql.getLeavingListener()(*k, tp);
133  break;
134  case ContinuousQueryResult_ResultType_UPDATED:
135  cql.getUpdatedListener()(*k, tp);
136  break;
137  default:
138  //uIgnore unknown types
139  break;
140  }
141  };
142  cql.cl.add_listener(cql.listenerCustomEvent);
143  this->base_addClientListener(cql.cl, filterFactoryParams,
144  converterFactoryParams, cql.getFailoverListener());
145  }
146  template <class K, class V>
147  void base_addContinuousQueryListener(ContinuousQueryListener<K, V>& cql) {
148  static char CONTINUOUS_QUERY_FILTER_FACTORY_NAME[] =
149  "continuous-query-filter-converter-factory";
150  cql.cl.filterFactoryName = std::vector<char>(
151  CONTINUOUS_QUERY_FILTER_FACTORY_NAME,
152  CONTINUOUS_QUERY_FILTER_FACTORY_NAME
153  + strlen(CONTINUOUS_QUERY_FILTER_FACTORY_NAME));
154  cql.cl.converterFactoryName = std::vector<char>(
155  CONTINUOUS_QUERY_FILTER_FACTORY_NAME,
156  CONTINUOUS_QUERY_FILTER_FACTORY_NAME
157  + strlen(CONTINUOUS_QUERY_FILTER_FACTORY_NAME));
158  BasicTypesProtoStreamMarshaller<std::string> paramMarshaller;
159  std::vector<std::vector<char> > filterFactoryParams;
160  std::vector<char> param;
161  paramMarshaller.marshall(cql.getQuery(), param);
162  filterFactoryParams.push_back(param);
163  std::vector<std::vector<char> > converterFactoryParams;
164  cql.cl.useRawData = true;
165  cql.listenerCustomEvent =
166  [this, &cql](ClientCacheEntryCustomEvent e) {
167  ContinuousQueryResult r;
168  WrappedMessage wm;
169  wm.ParseFromArray(e.getEventData().data(), e.getEventData().size());
170  r.ParseFromString(wm.wrappedmessagebytes());
171  auto resultType = r.resulttype();
172  K* k = (K*)this->baseKeyUnmarshall(std::vector<char>(r.key().begin(), r.key().end()));
173  V* v = (V*)this->baseValueUnmarshall(std::vector<char>(r.value().begin(), r.value().end()));
174  switch (resultType) {
175  case ContinuousQueryResult_ResultType_JOINING:
176  cql.getJoiningListener()(*k, *v);
177  break;
178  case ContinuousQueryResult_ResultType_LEAVING:
179  cql.getLeavingListener()(*k, *v);
180  break;
181  case ContinuousQueryResult_ResultType_UPDATED:
182  cql.getUpdatedListener()(*k, *v);
183  break;
184  default:
185  //uIgnore unknown types
186  break;
187  }
188  };
189  cql.cl.add_listener(cql.listenerCustomEvent);
190  this->base_addClientListener(cql.cl, filterFactoryParams,
191  converterFactoryParams, cql.getFailoverListener());
192  }
193 #endif
194 
195  void setValueCopyConstructor(ValueCopyConstructHelperFn valueCopyConstrunctor) {
196  this->valueCopyConstructor = valueCopyConstrunctor;
197  }
198 
199 
200  void setValueDestructor(ValueDestructorHelperFn valueDestructor) {
201  this->valueDestructor = valueDestructor;
202  }
203 
204  void setValueMarshaller(const ValueMarshallerHelperFn& valueMarshaller = nullptr)
205  {
206  this->valueMarshallerFn = valueMarshaller;
207  }
208 
209  HR_EXTERN void cloneImplWithDataFormat(EntryMediaTypes *df);
210  TransactionManager &transactionManager;
211  TransactionTable &transactionTable;
212  ValueCopyConstructHelperFn valueCopyConstructor = nullptr;
213  ValueDestructorHelperFn valueDestructor = nullptr;
214  ValueMarshallerHelperFn valueMarshallerFn = nullptr;
215  void *remoteCachePtr=nullptr; // TODO: pointer to self, is it really necessary?
216  bool forceReturnValue;
217  bool transactional;
218 
219 private:
220  std::shared_ptr<RemoteCacheImpl> impl; // pointer to RemoteCacheImpl;
221  MarshallHelperFn baseKeyMarshallFn;
222  MarshallHelperFn baseValueMarshallFn;
223  HR_EXTERN void baseKeyMarshall(const void* k, std::vector<char> &buf);
224  HR_EXTERN void baseValueMarshall(const void* v, std::vector<char> &buf);
225 
226  UnmarshallHelperFn baseKeyUnmarshallFn;
227  UnmarshallHelperFn baseValueUnmarshallFn;
228  HR_EXTERN void* baseKeyUnmarshall(const std::vector<char> &buf);
229  HR_EXTERN void* baseValueUnmarshall(const std::vector<char> &buf);
230  void* transactional_base_get(Transaction& currentTransaction, const void* key);
231  void* transactional_base_put(Transaction& currentTransaction, const void* key, const void* val, int64_t life,
232  int64_t idle, bool forceRV);
233 
234 
235 friend class RemoteCacheManager;
236 friend class RemoteCacheImpl;
237 friend class NearRemoteCacheImpl;
238 friend class KeyUnmarshallerFtor;
239 friend class ValueUnmarshallerFtor;
240 template <class K, class V>
241 friend class ::infinispan::hotrod::event::CacheClientListener;
242 #ifndef SWIGCSHARP
243 template <typename... Params>
244 friend class ::infinispan::hotrod::event::ContinuousQueryListener;
245 template <class M> friend class RemoteExecution;
246 #endif
247 };
248 
249 }} // namespace
250 
251 #endif /* ISPN_HOTROD_REMOTECACHEBASE_H */
Definition: Transactions.h:21
Definition: Transactions.h:174
void(* MarshallHelperFn)(void *, const void *, std::vector< char > &)
Definition: RemoteCacheBase.h:39
Definition: Transactions.h:73
#define HR_EXTERN
Definition: ImportExport.h:35
std::function< void(const void *, std::vector< char > &)> ValueMarshallerHelperFn
Definition: RemoteCacheBase.h:43
Definition: TransactionManager.h:24
std::function< void *(const void *)> ValueCopyConstructHelperFn
Definition: RemoteCacheBase.h:41
Definition: DataFormat.h:79
std::tuple< H, Params...> popTuple(const RepeatedPtrField< WrappedMessage > &wMsgs, int &k)
Definition: QueryUtils.h:94
Definition: RemoteCacheManager.h:43
Definition: ClientListener.h:35
void *(* UnmarshallHelperFn)(void *, const std::vector< char > &)
Definition: RemoteCacheBase.h:40
Definition: CacheClientListener.h:33
std::function< void(const void *)> ValueDestructorHelperFn
Definition: RemoteCacheBase.h:42
Definition: ContinuousQueryListener.h:34
Definition: MetadataValue.h:9
Flag
Definition: Flag.h:7
Definition: Transactions.h:130
Definition: RemoteExecution.h:29
Definition: VersionedValue.h:9