00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #include "ks_config.h"
00038 #define RECV_SIZE 65536
00039
00040 #ifdef HAVE_STRING_H
00041 #include <string.h>
00042 #endif
00043
00044 #ifdef HAVE_UNISTD_H
00045 #include <unistd.h>
00046 #endif
00047
00048 #ifdef HAVE_SYS_TYPES_H
00049 #include <sys/types.h>
00050 #endif
00051
00052 #ifdef HAVE_SYS_SOCKET_H
00053 #include <sys/socket.h>
00054 #endif
00055
00056 #ifdef HAVE_NETINET_IN_H
00057 #include <netinet/in.h>
00058 #endif
00059
00060 #ifdef HAVE_ARPA_INET_H
00061 #include <arpa/inet.h>
00062 #endif
00063
00064 #ifdef HAVE_NETDB_H
00065 #include <netdb.h>
00066 #endif
00067
00068
00069
00070
00071
00072
00073
00074
00075 #include <pthread.h>
00076
00077 #ifdef STDC_HEADERS
00078 #include <stdio.h>
00079 #include <stdlib.h>
00080 #include <errno.h>
00081 #endif
00082
00083 #ifdef HAVE_SYS_TIME_H
00084 # include <sys/time.h>
00085 # ifdef TIME_WITH_SYS_TIME
00086 # ifdef HAVE_TIME_H
00087 # include <sys/time.h>
00088 # endif
00089 # endif
00090 #else
00091 # ifdef HAVE_TIME_H
00092 # include <sys/time.h>
00093 # endif
00094 #endif
00095
00096 #ifdef HAVE_SYS_SELECT_H
00097 #include <sys/select.h>
00098 #endif
00099
00100 #include "ks_socket.h"
00101 #include "ks_malloc.h"
00102
00103 #define MSECS 1000
00104 #define USECS 1000000
00105 #define NSECS 1000000000
00106
00114 struct _ks_socket_t {
00115 char *hostname;
00116 char *port;
00117 int proto;
00119 struct sockaddr_in
00120 addr;
00121 int socket;
00122 pthread_mutex_t
00123 m_send;
00124 pthread_mutex_t
00125 m_recv;
00126 pthread_mutex_t
00127 m_cache;
00128 pthread_cond_t
00129 c_cache;
00131 ks_array_t cache;
00132 char *princ;
00133 char *creds;
00134
00135
00136
00137
00138 };
00139
00140 void
00141 ks_socket_set_auth(ks_socket_t *s,
00142 const char *principal, const char *credentials)
00143 {
00144 int len;
00145
00146 if (s->princ) {
00147 ks_free(s->princ);
00148 s->princ = NULL;
00149 }
00150
00151 if (s->creds) {
00152 ks_free(s->creds);
00153 s->creds = NULL;
00154 }
00155
00156 if (principal != NULL) {
00157 len = strlen(principal);
00158 s->princ = ks_malloc(len + 1);
00159 strcpy(s->princ, principal);
00160 }
00161
00162 if (credentials != NULL) {
00163 len = strlen(credentials);
00164 s->creds = ks_malloc(len + 1);
00165 strcpy(s->creds, credentials);
00166 }
00167 }
00168
00169 #define QUEUE_SIZE_MAX 64
00170
00179 static ks_response_t *
00180 ks_socket_cache_get(ks_socket_t *k, ks_string_t *id)
00181 {
00182 ks_response_t *r;
00183 int i;
00184
00185
00186
00187
00188 for (i = 0; i < ks_array_length(&(k->cache)); i++) {
00189 r = (ks_response_t *)ks_array_get(&(k->cache), i);
00190 if (ks_string_equals(ks_response_id(r), id)) {
00191 ks_array_remove(&(k->cache), i);
00192 return r;
00193 }
00194 }
00195
00196
00197
00198 return NULL;
00199 }
00200
00208 static void
00209 ks_socket_cache_add(ks_socket_t *k, ks_response_t *r)
00210 {
00211
00212
00213
00214 if (ks_array_length(&(k->cache)) >= QUEUE_SIZE_MAX)
00215 ks_array_remove(&(k->cache), 0);
00216 ks_array_add(&(k->cache), (ks_base_t *)r);
00217
00218
00219 }
00220
00227
00228
00229
00230
00231
00232 static int
00233 ks_socket_addr(ks_socket_t *k)
00234 {
00235 struct addrinfo hh;
00236 struct addrinfo *ad;
00237 struct addrinfo *ap;
00238 int ss;
00239 memset(&hh, 0, sizeof(struct addrinfo));
00240 hh.ai_family = AF_INET;
00241 if (k->proto == IPPROTO_TCP)
00242 hh.ai_socktype = SOCK_STREAM;
00243 else if (k->proto == IPPROTO_UDP)
00244 hh.ai_socktype = SOCK_DGRAM;
00245 else
00246 hh.ai_socktype = 0;
00247 hh.ai_protocol = k->proto;
00248 hh.ai_flags = AI_ADDRCONFIG;
00249 ss = getaddrinfo(k->hostname, k->port, &hh, &ad);
00250 if (ss != 0) {
00251 ks_error("getaddrinfo(%s, %s): %s",
00252 k->hostname, k->port, gai_strerror(ss));
00253 return ss;
00254 }
00255 for (ap = ad; ap; ap = ap->ai_next) {
00256 if (ap->ai_family == AF_INET) {
00257 memcpy(&(k->addr), ap->ai_addr, ap->ai_addrlen);
00258 freeaddrinfo(ad);
00259 return 0;
00260 }
00261 }
00262 freeaddrinfo(ad);
00263 return 1;
00264 }
00265
00274 ks_socket_t *
00275 ks_socket_new(const char *hostname, const char *port, int proto)
00276 {
00277 ks_socket_t *k;
00278
00279 if (hostname == NULL)
00280 hostname = KS_SLAVE_HOST_DEFAULT;
00281 if (port == NULL)
00282 port = KS_SLAVE_PORT_DEFAULT;
00283
00284 if (proto == IPPROTO_TCP)
00285 ;
00286 else if (proto == IPPROTO_UDP)
00287 ;
00288 else {
00289 ks_error("Unsupported protocol %d\n", proto);
00290 return NULL;
00291 }
00292
00293
00294 k = (ks_socket_t *)ks_malloc(sizeof(ks_socket_t));
00295 memset(k, 0, sizeof(ks_socket_t));
00296 k->hostname = (char *)ks_malloc(strlen(hostname) + 1);
00297 strcpy(k->hostname, hostname);
00298 k->port = (char *)ks_malloc(strlen(port) + 1);
00299 strcpy(k->port, port);
00300 k->proto = proto;
00301 k->socket = -1;
00302 k->princ = NULL;
00303 k->creds = NULL;
00304
00305 pthread_mutex_init(&(k->m_send), NULL);
00306 pthread_mutex_init(&(k->m_recv), NULL);
00307 pthread_mutex_init(&(k->m_cache), NULL);
00308 pthread_cond_init(&(k->c_cache), NULL);
00309
00310 ks_array_init(&(k->cache));
00311 return k;
00312 }
00313
00321 void
00322 ks_socket_free(ks_socket_t *k)
00323 {
00324 ks_socket_close(k);
00325 ks_free(k->hostname);
00326 ks_free(k->port);
00327
00328 pthread_mutex_destroy(&(k->m_send));
00329 pthread_mutex_destroy(&(k->m_recv));
00330 pthread_mutex_destroy(&(k->m_cache));
00331 pthread_cond_destroy(&(k->c_cache));
00332
00333 ks_array_fini(&(k->cache));
00334 if (k->princ)
00335 ks_free(k->princ);
00336 if (k->creds)
00337 ks_free(k->creds);
00338 ks_free(k);
00339 }
00340
00349
00350 int
00351 ks_socket_connect(ks_socket_t *k)
00352 {
00353 if (k->socket != -1)
00354 ks_socket_close(k);
00355
00356 if (ks_socket_addr(k) != 0)
00357 return -1;
00358 if (k->proto == IPPROTO_TCP) {
00359 k->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
00360 }
00361 else if (k->proto == IPPROTO_UDP) {
00362 k->socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00363 }
00364 else {
00365
00366 return -1;
00367 }
00368
00369 if (k->socket == -1) {
00370 ks_error("ks_socket_connect: socket: %s", strerror(errno));
00371 return -1;
00372 }
00373 if (connect(k->socket,
00374 (struct sockaddr *)&(k->addr),
00375 sizeof(struct sockaddr_in)) < 0) {
00376 ks_error("ks_socket_connect: connect: %s", strerror(errno));
00377 return -1;
00378 }
00379
00380 return 0;
00381 }
00382
00389 void
00390 ks_socket_close(ks_socket_t *k)
00391 {
00392 if (k->socket != -1)
00393 close(k->socket);
00394 k->socket = -1;
00395 }
00396
00397 static int
00398 ks_socket_send_real(ks_socket_t *k, void *buf, int len) {
00399 int ret;
00400 int off = 0;
00401
00402 do {
00403 ret = write(k->socket, buf + off, len - off);
00404 if (ret < 0) {
00405 ks_error("ks_socket_send: write: %s", strerror(errno));
00406 ks_socket_close(k);
00407 return ret;
00408 }
00409 off += ret;
00410 } while (off < len);
00411
00412 return off;
00413 }
00414
00423 ssize_t
00424 ks_socket_send(ks_socket_t *k, ks_bquery_t *q)
00425 {
00426 void *buf;
00427 ks_string_t *r;
00428 uint32_t len;
00429 ssize_t ret = -1;
00430
00431 if (k->socket == -1) {
00432 ret = ks_socket_connect(k);
00433 if (ret < 0)
00434 return ret;
00435 }
00436
00437 if (k->princ != NULL) {
00438 int authed = ks_bquery_has_auth(q);
00439
00440 if (!authed)
00441 ks_bquery_set_auth(q, k->princ, k->creds);
00442 }
00443
00444
00445 r = ks_bquery_packet(q);
00446
00447 pthread_mutex_lock(&(k->m_send));
00448
00449 if (k->proto == IPPROTO_TCP) {
00450
00451
00452
00453 buf = ks_malloc(4 + ks_string_length(r));
00454
00455 len = htonl(ks_string_length(r));
00456 memcpy(buf, &len, 4);
00457 memcpy(buf + 4, ks_string_get(r), ks_string_length(r));
00458 ret = ks_socket_send_real(k, buf, 4 + ks_string_length(r));
00459
00460
00461 ks_free(buf);
00462
00463 }
00464 else if (k->proto == IPPROTO_UDP) {
00465
00466
00467
00468 ret = sendto(k->socket,
00469 ks_string_get(r), ks_string_length(r),
00470 0,
00471
00472
00473 NULL, 0);
00474 if (ret < 0) {
00475 ks_error("ks_socket_send: sendto: %s", strerror(errno));
00476 ks_socket_close(k);
00477 }
00478 }
00479
00480 pthread_mutex_unlock(&(k->m_send));
00481
00482 ks_string_free(r);
00483
00484 return ret;
00485 }
00486
00487 static int
00488 ks_socket_tv_subtract(struct timeval *out,
00489 const struct timeval *x, struct timeval *y)
00490 {
00491
00492 if (x->tv_usec < y->tv_usec) {
00493 int nsec = (y->tv_usec - x->tv_usec) / USECS + 1;
00494 y->tv_usec -= USECS * nsec;
00495 y->tv_sec += nsec;
00496 }
00497 if (x->tv_usec - y->tv_usec > USECS) {
00498 int nsec = (x->tv_usec - y->tv_usec) / USECS;
00499 y->tv_usec += USECS * nsec;
00500 y->tv_sec -= nsec;
00501 }
00502
00503
00504
00505 out->tv_sec = x->tv_sec - y->tv_sec;
00506 out->tv_usec = x->tv_usec - y->tv_usec;
00507
00508
00509 return x->tv_sec < y->tv_sec;
00510 }
00511
00521 static ks_response_t *
00522 ks_socket_recv_real(ks_socket_t *k, const struct timeval *end)
00523 {
00524 char buffer[RECV_SIZE];
00525 uint32_t len;
00526 ssize_t ret;
00527 uint32_t off;
00528
00529 struct timeval now;
00530 struct timeval timeout;
00531 fd_set rfds;
00532
00533 if (k->socket == -1) {
00534 ks_error("Cannot receive from closed/invalid socket");
00535 return NULL;
00536 }
00537
00538
00539
00540
00541 {
00542 gettimeofday(&now, NULL);
00543 if (ks_socket_tv_subtract(&timeout, end, &now)) {
00544 ks_error("ks_socket_recv: timeout on socket %d", k->socket);
00545 return NULL;
00546 }
00547 FD_ZERO(&rfds);
00548 FD_SET(k->socket, &rfds);
00549 select(k->socket + 1, &rfds, NULL, NULL, &timeout);
00550 if (!FD_ISSET(k->socket, &rfds)) {
00551 ks_error("ks_socket_recv: timeout on socket %d", k->socket);
00552 return NULL;
00553 }
00554 }
00555
00556 if (k->proto == IPPROTO_TCP) {
00557 len = 1;
00558 for (off = 0; off < 4; off += ret) {
00559 ret = read(k->socket, &(buffer[off]), 4 - off);
00560 if (ret < 0) {
00561 ks_error("ks_socket_recv: read: %s", strerror(errno));
00562 ks_socket_close(k);
00563 len = 0;
00564 break;
00565 }
00566 }
00567 if (len > 0) {
00568 len = ntohl(*(uint32_t *)buffer);
00569 if (len > RECV_SIZE || len < 2) {
00570 ks_error("Cannot receive packet: bad size %u", len);
00571 ks_socket_close(k);
00572 len = 0;
00573 }
00574 else {
00575 for (off = 0; off < len; off += ret) {
00576 ret = read(k->socket, &(buffer[off]), len - off);
00577 if (ret < 0) {
00578 ks_error("ks_socket_recv: read: %s",
00579 strerror(errno));
00580 ks_socket_close(k);
00581 len = 0;
00582 break;
00583 }
00584 }
00585 }
00586 }
00587 }
00588 else if (k->proto == IPPROTO_UDP) {
00589 ret = recvfrom(k->socket, buffer,
00590 RECV_SIZE, 0,
00591 NULL, NULL);
00592 if (ret < 0) {
00593 ks_error("ks_socket_recv: recvfrom: %s", strerror(errno));
00594 ks_socket_close(k);
00595 len = 0;
00596 }
00597 else {
00598 len = ret;
00599 }
00600 }
00601 else {
00602 len = 0;
00603 }
00604
00605
00606
00607 if (len == 0)
00608 return NULL;
00609
00610 return ks_response_bparse(buffer, len);
00611 }
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632
00633
00634
00635
00636
00646
00647 ks_response_t *
00648 ks_socket_recv(ks_socket_t *k, ks_string_t *id, int timeout)
00649 {
00650 ks_response_t *r;
00651 ks_string_t *rid;
00652
00653 if (timeout <= 0)
00654 timeout = KS_TIMEOUT_DEFAULT;
00655
00656 struct timeval tv;
00657 struct timespec ts;
00658
00659 {
00660 gettimeofday(&tv, NULL);
00661 tv.tv_usec += timeout * (USECS/MSECS);
00662 tv.tv_sec += (tv.tv_usec / USECS);
00663 tv.tv_usec %= USECS;
00664
00665 ts.tv_sec = tv.tv_sec;
00666 ts.tv_nsec = tv.tv_usec * (NSECS/USECS);
00667 }
00668
00669 pthread_mutex_lock(&(k->m_cache));
00670 for (;;) {
00671 r = ks_socket_cache_get(k, id);
00672 if (r != NULL) {
00673 pthread_mutex_unlock(&(k->m_cache));
00674 return r;
00675 }
00676
00677
00678
00679
00680 if (pthread_mutex_trylock(&(k->m_recv)) != EBUSY) {
00681
00682
00683 pthread_mutex_unlock(&(k->m_cache));
00684 for (;;) {
00685
00686
00687 r = ks_socket_recv_real(k, &tv);
00688 if (r == NULL)
00689 goto recv_done;
00690 rid = ks_response_id(r);
00691 if (rid == NULL) {
00692
00693 ks_error("ks_socket_recv: received packet has no id");
00694 goto recv_done;
00695 }
00696 if (ks_string_equals(rid, id))
00697 goto recv_done;
00698 pthread_mutex_lock(&(k->m_cache));
00699 ks_socket_cache_add(k, r);
00700 pthread_cond_broadcast(&(k->c_cache));
00701 pthread_mutex_unlock(&(k->m_cache));
00702 }
00703 }
00704 if (pthread_cond_timedwait(&(k->c_cache), &(k->m_cache), &ts)
00705 == ETIMEDOUT)
00706 return NULL;
00707 }
00708
00709 recv_done:
00710
00711 pthread_mutex_unlock(&(k->m_recv));
00712 pthread_mutex_lock(&(k->m_cache));
00713 pthread_cond_broadcast(&(k->c_cache));
00714 pthread_mutex_unlock(&(k->m_cache));
00715 return r;
00716 }
00717
00733 ks_response_t *
00734 ks_socket_ask(ks_socket_t *k, ks_bquery_t *q, int timeout)
00735
00736 {
00737 ks_response_t *r;
00738 int i;
00739
00740 if (timeout <= 0)
00741 timeout = KS_TIMEOUT_DEFAULT;
00742
00743
00744
00745 for (i = 0; i < 3; i++) {
00746 if (ks_socket_send(k, q) < 0)
00747 return NULL;
00748 r = ks_socket_recv(k, ks_bquery_id_get(q), timeout);
00749 if (r != NULL)
00750 return r;
00751 timeout += timeout;
00752 }
00753
00754 return NULL;
00755 }