0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037 #include <stdio.h>
0038 #include <stdlib.h>
0039 #include <string.h>
0040 #include <errno.h>
0041 #include <regex.h>
0042 #include <utlist.h>
0043
0044 #include <ndebug.h>
0045 #include <atmi.h>
0046 #include <sys_unix.h>
0047 #include <atmi_int.h>
0048 #include <typed_buf.h>
0049 #include <ndrstandard.h>
0050 #include <ubf.h>
0051 #include <Exfields.h>
0052 #include <atmi_shm.h>
0053 #include <exregex.h>
0054 #include "tpevsv.h"
0055
0056
0057
0058
0059
0060 exprivate event_entry_t *M_subscribers=NULL;
0061
0062
0063 exprivate NDRX_RWLOCK_DECL(M_subscribers_lock);
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073 exprivate long remove_by_my_id (long subscription, char *my_id)
0074 {
0075 event_entry_t *elt, *tmp;
0076 long deleted = 0;
0077
0078
0079 DL_FOREACH_SAFE(M_subscribers,elt,tmp)
0080 {
0081 NDRX_LOG(log_debug, "Checking Nr: %d, my_id: %s",
0082 elt->subscriberNr, elt->my_id);
0083
0084 if ((-1==subscription &&
0085 !(elt->flags & TPEVPERSIST) &&
0086 (NULL==my_id || (0==strcmp(elt->my_id, my_id)))
0087 ) ||
0088 subscription==elt->subscriberNr
0089 )
0090 {
0091 NDRX_LOG(log_debug, "Removing subscription %ld", subscription);
0092
0093 ndrx_regfree(&elt->re);
0094
0095 DL_DELETE(M_subscribers,elt);
0096 NDRX_FPFREE(elt);
0097 deleted++;
0098 }
0099
0100 }
0101
0102 return deleted;
0103 }
0104
0105
0106
0107
0108
0109
0110 exprivate int compile_eventexpr(event_entry_t *p_ee)
0111 {
0112 return ndrx_regcomp(&(p_ee->re), p_ee->eventexpr);
0113 }
0114
0115
0116
0117
0118
0119
0120 exprivate void process_postage(TPSVCINFO *p_svc, int dispatch_over_bridges)
0121 {
0122 int ret=EXSUCCEED;
0123 char *data = p_svc->data;
0124 event_entry_t *elt, *tmp;
0125 long numdisp = 0;
0126 char tmpsvc[MAXTIDENT+1];
0127 char buf_type[9];
0128 char buf_subtype[17];
0129 long buf_len;
0130 long flags;
0131 int locked = EXFALSE;
0132 tp_command_call_t * last_call;
0133 buffer_obj_t *bo;
0134
0135
0136 string_hash_t *dup_chk = NULL;
0137
0138 memset(buf_type, 0, sizeof(buf_type));
0139 memset(buf_subtype, 0, sizeof(buf_subtype));
0140
0141 NDRX_LOG(log_debug, "process_postage got call");
0142
0143 if (NULL!=data)
0144 {
0145 buf_len = tptypes(data, buf_type, buf_subtype);
0146
0147 if (strcmp(buf_type, BUF_TYPE_UBF_STR) &&
0148 debug_get_ndrx_level() > log_debug)
0149 {
0150 Bfprint((UBFH *)data, stderr);
0151 }
0152 }
0153
0154 last_call=ndrx_get_G_last_call();
0155
0156 NDRX_LOG(log_debug, "Posting event [%s] to system", last_call->extradata);
0157
0158
0159
0160
0161 NDRX_RWLOCK_RLOCK_V(M_subscribers_lock);
0162 locked=EXTRUE;
0163
0164 DL_FOREACH_SAFE(M_subscribers,elt,tmp)
0165 {
0166
0167 typed_buffer_descr_t *descr;
0168
0169
0170 bo = ndrx_find_buffer(p_svc->data);
0171
0172 descr = &G_buf_descr[bo->type_id];
0173
0174 NDRX_LOG(log_debug, "Checking Nr: %d, event [%s]",
0175 elt->subscriberNr, elt->eventexpr);
0176
0177
0178 if (EXSUCCEED==regexec(&elt->re, last_call->extradata, (size_t) 0, NULL, 0))
0179 {
0180 NDRX_LOG(log_debug, "Event matched");
0181
0182
0183 if (EXEOS!=elt->filter[0])
0184 {
0185 NDRX_LOG(log_debug, "Using filter: [%s]", elt->filter);
0186 }
0187
0188
0189 if ((EXEOS!=elt->filter[0] &&
0190 descr->pf_test(descr, p_svc->data, p_svc->len, elt->filter)) ||
0191 EXEOS==elt->filter[0])
0192 {
0193 NDRX_LOG(log_debug, "Dispatching event");
0194 if (elt->flags & TPEVSERVICE)
0195 {
0196 int err;
0197 NDRX_LOG(log_debug, "Calling service %s/%s in async mode",
0198 elt->name1, elt->my_id);
0199
0200
0201
0202
0203 if (ndrx_string_hash_get(dup_chk, elt->name1))
0204 {
0205 NDRX_LOG(log_debug, "Service already called: [%s] - skip dup",
0206 elt->name1);
0207 continue;
0208 }
0209
0210 flags = p_svc->flags | TPNOREPLY;
0211 NDRX_LOG(log_debug, "Calling service %s/%s in async mode flags: 0x%lx (2)",
0212 elt->name1, elt->my_id, flags);
0213
0214 if (EXFAIL==(err=tpacallex (elt->name1, p_svc->data, p_svc->len,
0215 flags, last_call->extradata,
0216 EXFAIL, EXTRUE,
0217
0218 last_call->rval, last_call->rcode,
0219 last_call->user3, last_call->user4)))
0220 {
0221 if (tperrno!=TPEBLOCK)
0222 {
0223 NDRX_LOG(log_error, "Failed to call service [%s/%s]: %s"
0224 " - unsubscribing %ld",
0225 elt->name1, elt->my_id,
0226 tpstrerror(tperrno), elt->subscriberNr);
0227
0228
0229
0230
0231 NDRX_RWLOCK_UNLOCK_V(M_subscribers_lock);
0232 NDRX_RWLOCK_WLOCK_V(M_subscribers_lock);
0233
0234 remove_by_my_id(elt->subscriberNr, NULL);
0235 }
0236 else
0237 {
0238 NDRX_LOG(log_error, "TPEBLOCK during call "
0239 "of service [%s/%s] subscr: %ld - skip",
0240 elt->name1, elt->my_id, elt->subscriberNr);
0241 }
0242 }
0243 else
0244 {
0245
0246 if (NULL==ndrx_string_hash_add(&dup_chk, elt->name1))
0247 {
0248 NDRX_LOG(log_error, "Failed to add service [%s] to "
0249 "dup hash list!", elt->name1);
0250 EXFAIL_OUT(ret);
0251 }
0252
0253 numdisp++;
0254
0255 if (err)
0256 {
0257 tpcancel(err);
0258 }
0259 }
0260 }
0261 else
0262 {
0263 NDRX_LOG(log_debug, "Skipping subscriber due to "
0264 "unsupported event delivery mechanism!");
0265 }
0266 }
0267 else
0268 {
0269 NDRX_LOG(log_debug, "Not dispatching event due to filter");
0270 }
0271 }
0272 }
0273
0274 NDRX_RWLOCK_UNLOCK_V(M_subscribers_lock);
0275 locked=EXFALSE;
0276
0277 if (dispatch_over_bridges)
0278 {
0279 char nodes[CONF_NDRX_NODEID_COUNT+1] = {EXEOS};
0280 int i = 0;
0281 long olen;
0282 NDRX_LOG(log_debug, "Dispatching events over the bridges...!");
0283 if (EXSUCCEED==ndrx_shm_birdge_getnodesconnected(nodes))
0284 {
0285 while (nodes[i])
0286 {
0287 int nodeid = nodes[i];
0288 char *tmp_data = NULL;
0289
0290 if (NULL!=tmp_data)
0291 {
0292 if (buf_len < 0)
0293 {
0294 NDRX_LOG(log_error, "Invalid buffer type!");
0295 break;
0296 }
0297
0298 tmp_data = tpalloc(buf_type,
0299 (buf_subtype[0]==EXEOS?NULL:buf_subtype), buf_len);
0300 }
0301 else
0302 {
0303
0304 tmp_data = tpalloc(BUF_TYPE_UBF_STR, NULL, 1024);
0305 }
0306
0307 if (NULL==tmp_data)
0308 {
0309 NDRX_LOG(log_error, "Cannot deliver event to node %c"
0310 " - tpalloc failed for dest buffer...");
0311 break;
0312 }
0313
0314
0315 snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SYS_SVC_PFX EV_TPEVDOPOST,
0316 (short)nodeid);
0317
0318
0319 flags = (p_svc->flags & ~TPNOREPLY);
0320
0321 if (EXFAIL==(tpcallex (tmpsvc, p_svc->data, p_svc->len,
0322 &tmp_data, &olen,
0323 flags, last_call->extradata, nodeid, TPCALL_BRCALL,
0324
0325 last_call->rval, last_call->rcode,
0326 last_call->user3, last_call->user4)))
0327 {
0328 NDRX_LOG(log_error, "Call bridge %d: [%s]: %s",
0329 nodeid, tmpsvc, tpstrerror(tperrno));
0330 }
0331 else
0332 {
0333 NDRX_LOG(log_debug, "On node %d applied %d events",
0334 nodeid, tpurcode);
0335 numdisp+=tpurcode;
0336 }
0337
0338 if (tmp_data)
0339 {
0340 tpfree(tmp_data);
0341 }
0342
0343 i++;
0344 }
0345 }
0346 }
0347
0348 out:
0349
0350 if (locked)
0351 {
0352 NDRX_RWLOCK_UNLOCK_V(M_subscribers_lock);
0353 }
0354
0355 if (NULL!=dup_chk)
0356 {
0357 ndrx_string_hash_free(dup_chk);
0358 }
0359
0360 tpreturn( ret==EXSUCCEED?TPSUCCESS:TPFAIL,
0361 numdisp,
0362 NULL,
0363 0L,
0364 0L);
0365 }
0366
0367
0368
0369
0370
0371
0372
0373
0374
0375
0376
0377 void TPEVDOPOST(TPSVCINFO *p_svc)
0378 {
0379 process_postage(p_svc, EXFALSE);
0380 }
0381
0382
0383
0384
0385
0386
0387
0388
0389
0390 void TPEVPOST (TPSVCINFO *p_svc)
0391 {
0392
0393 process_postage(p_svc, EXTRUE);
0394 }
0395
0396
0397
0398
0399
0400
0401
0402
0403 void TPEVUNSUBS (TPSVCINFO *p_svc)
0404 {
0405 int ret=EXSUCCEED;
0406 UBFH *p_ub = (UBFH *)p_svc->data;
0407 long subscriberNr = 0;
0408 long deleted = 0;
0409
0410 NDRX_LOG(log_debug, "EX_EVUNSUBS got call");
0411 Bfprint(p_ub, stderr);
0412
0413
0414
0415 if (EXFAIL==CBget(p_ub, EV_SUBSNR, 0, (char *)&subscriberNr, NULL, BFLD_LONG))
0416 {
0417 NDRX_LOG(log_error, "Failed to get EV_SUBSNR/subscriberNr: %s",
0418 Bstrerror(Berror));
0419 ret=EXFAIL;
0420 goto out;
0421 }
0422 NDRX_LOG(log_debug, "About to remove subscription: %ld, my_id: %s",
0423 subscriberNr, ndrx_get_G_last_call()->my_id);
0424
0425 NDRX_RWLOCK_WLOCK_V(M_subscribers_lock);
0426 deleted=remove_by_my_id(subscriberNr, ndrx_get_G_last_call()->my_id);
0427 NDRX_RWLOCK_UNLOCK_V(M_subscribers_lock);
0428
0429 out:
0430 tpreturn( ret==EXSUCCEED?TPSUCCESS:TPFAIL,
0431 deleted,
0432 NULL,
0433 0L,
0434 0L);
0435 }
0436
0437
0438
0439
0440
0441
0442
0443
0444
0445
0446
0447
0448 void TPEVSUBS (TPSVCINFO *p_svc)
0449 {
0450 int ret=EXSUCCEED;
0451 UBFH *p_ub = (UBFH *)p_svc->data;
0452 event_entry_t *p_ee;
0453 BFLDLEN len;
0454 static long subscriberNr = 0;
0455
0456 NDRX_LOG(log_debug, "TPEVSUBS got call");
0457 Bfprint(p_ub, stderr);
0458
0459 if (NULL==(p_ee=NDRX_FPMALLOC(sizeof(event_entry_t), 0)))
0460 {
0461 NDRX_LOG(log_error, "Failed to allocate %d bytes: %s!",
0462 sizeof(event_entry_t), strerror(errno));
0463 ret=EXFAIL;
0464 goto out;
0465 }
0466
0467 memset((char *)p_ee, 0, sizeof(event_entry_t));
0468
0469 NDRX_STRCPY_SAFE(p_ee->my_id, ndrx_get_G_last_call()->my_id);
0470 len=sizeof(p_ee->eventexpr);
0471 if (Bpres(p_ub, EV_MASK, 0) && EXFAIL==Bget(p_ub, EV_MASK, 0,
0472 p_ee->eventexpr, &len))
0473 {
0474 NDRX_LOG(log_error, "Failed to get EV_MASK/eventexpr: %s",
0475 Bstrerror(Berror));
0476 ret=EXFAIL;
0477 goto out;
0478 }
0479
0480 len=sizeof(p_ee->filter);
0481 if (Bpres(p_ub, EV_FILTER, 0) && EXFAIL==Bget(p_ub, EV_FILTER, 0,
0482 p_ee->filter, &len))
0483 {
0484 NDRX_LOG(log_error, "Failed to get EV_FILTER/filter: %s",
0485 Bstrerror(Berror));
0486 ret=EXFAIL;
0487 goto out;
0488 }
0489
0490
0491 if (EXFAIL==CBget(p_ub, EV_FLAGS, 0, (char *)&p_ee->flags, NULL, BFLD_LONG))
0492 {
0493 NDRX_LOG(log_error, "Failed to get EV_FLAGS/flags: %s",
0494 Bstrerror(Berror));
0495 ret=EXFAIL;
0496 goto out;
0497 }
0498
0499
0500 if (p_ee->flags & TPEVSERVICE)
0501 {
0502 len=sizeof(p_ee->name1);
0503 if (EXSUCCEED!=CBget(p_ub, EV_SRVCNM, 0, p_ee->name1, &len, BFLD_STRING))
0504 {
0505 NDRX_LOG(log_error, "Failed to get EV_SRVCNM/name1: %s",
0506 Bstrerror(Berror));
0507 ret=EXFAIL;
0508 goto out;
0509 }
0510 }
0511 else
0512 {
0513 NDRX_LOG(log_error, "tpsubscribe() unsupported flags: %ld",
0514 p_ee->flags);
0515 ret=EXFAIL;
0516 goto out;
0517 }
0518
0519
0520 if (EXSUCCEED!=compile_eventexpr(p_ee))
0521 {
0522 ret=EXFAIL;
0523 goto out;
0524 }
0525 p_ee->subscriberNr=subscriberNr;
0526
0527
0528 NDRX_LOG(log_debug, "Nr: %ld, event [%s], filter [%s], name1 [%s], flags [%ld]",
0529 p_ee->subscriberNr, p_ee->eventexpr, p_ee->filter,
0530 p_ee->name1, p_ee->flags);
0531 subscriberNr++;
0532
0533
0534 NDRX_RWLOCK_WLOCK_V(M_subscribers_lock);
0535 DL_APPEND(M_subscribers, p_ee);
0536 NDRX_RWLOCK_UNLOCK_V(M_subscribers_lock);
0537 out:
0538 tpreturn( ret==EXSUCCEED?TPSUCCESS:TPFAIL,
0539 p_ee->subscriberNr,
0540 NULL,
0541 0L,
0542 0L);
0543 }
0544
0545
0546
0547
0548 int tpsvrinit(int argc, char **argv)
0549 {
0550 int ret=EXSUCCEED;
0551 short nodeid = (short)tpgetnodeid();
0552 char tmpsvc[MAXTIDENT+1];
0553
0554 NDRX_LOG(log_debug, "tpsvrinit called");
0555
0556 snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SYS_SVC_PFX EV_TPEVSUBS, nodeid);
0557 if (EXSUCCEED!=tpadvertise(tmpsvc, TPEVSUBS))
0558 {
0559 NDRX_LOG(log_error, "Failed to initialize TPEVSUBS!");
0560 ret=EXFAIL;
0561 goto out;
0562 }
0563
0564 snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SYS_SVC_PFX EV_TPEVUNSUBS, nodeid);
0565 if (EXSUCCEED!=tpadvertise(tmpsvc, TPEVUNSUBS))
0566 {
0567 NDRX_LOG(log_error, "Failed to initialize TPEVUNSUBS!");
0568 ret=EXFAIL;
0569 goto out;
0570 }
0571
0572 snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SYS_SVC_PFX EV_TPEVPOST, nodeid);
0573 if (EXSUCCEED!=tpadvertise(tmpsvc, TPEVPOST))
0574 {
0575 NDRX_LOG(log_error, "Failed to initialize TPEVPOST!");
0576 ret=EXFAIL;
0577 goto out;
0578 }
0579
0580 snprintf(tmpsvc, sizeof(tmpsvc), NDRX_SYS_SVC_PFX EV_TPEVDOPOST, nodeid);
0581 if (EXSUCCEED!=tpadvertise(tmpsvc, TPEVDOPOST))
0582 {
0583 NDRX_LOG(log_error, "Failed to initialize TPEVDOPOST!");
0584 ret=EXFAIL;
0585 goto out;
0586 }
0587
0588 out:
0589 return ret;
0590 }
0591
0592 void tpsvrdone (void)
0593 {
0594
0595 }
0596
0597
0598 expublic struct tmdsptchtbl_t ndrx_G_tmdsptchtbl[] = {
0599 { NULL, NULL, NULL, 0, 0 }
0600 };
0601
0602
0603
0604
0605 int main( int argc, char** argv )
0606 {
0607 _tmbuilt_with_thread_option=EXTRUE;
0608 struct tmsvrargs_t tmsvrargs =
0609 {
0610 &tmnull_switch,
0611 &ndrx_G_tmdsptchtbl[0],
0612 0,
0613 tpsvrinit,
0614 tpsvrdone,
0615 NULL,
0616 NULL,
0617 NULL,
0618 NULL,
0619 NULL,
0620 NULL,
0621 NULL
0622 };
0623
0624 return( _tmstartserver( argc, argv, &tmsvrargs ));
0625
0626 }
0627
0628