Main Page | Data Structures | Directories | File List | Data Fields | Globals

ks_socket_posix.c

Go to the documentation of this file.
00001 /*
00002  * libkarmaclient - A C Library to the Karmasphere Reputation Server
00003  * Copyright (C) 2006 Karmasphere <http://www.karmasphere.com/>
00004  *  - Shevek <shevek@karmasphere.com>
00005  *  - Dave Stewart <dave.stewart@karmasphere.com>
00006  *
00007  * This library is free software; you can redistribute it and/or
00008  * modify it under the terms of the GNU Lesser General Public
00009  * License as published by the Free Software Foundation; either
00010  * version 2.1 of the License, or (at your option) any later version.
00011  *
00012  * This library is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00015  * Lesser General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU Lesser General Public
00018  * License along with this library; if not, write to the Free Software
00019  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00020  */
00021 
00022 /*
00023  * XXX NOTE TO PORTERS AND MAINTAINERS:
00024  *
00025  * If you want to do your own socket or I/O management, you do not
00026  * need either this file or ks_socket.h. You can simply use
00027  * ks_bquery_packet() and ks_response_bparse() with byte arrays
00028  * from your own I/O stack.
00029  *
00030  * If you do not have POSIX socket functions but still want to use
00031  * a blocking I/O stack, please contribute a socket implementation
00032  * for your own system conforming to the interface in ks_socket.h.
00033  *
00034  * Greets to Giorgio, Mario and Michael, this is for you.
00035  */
00036 
00037 #include "ks_config.h"
00038 #define RECV_SIZE     65536
00039 
00040 #ifdef HAVE_STRING_H
00041 #include <string.h>
00042 #endif
00043 
00044 #ifdef HAVE_UNISTD_H
00045 #include <unistd.h>
00046 #endif
00047 
00048 #ifdef HAVE_SYS_TYPES_H
00049 #include <sys/types.h>
00050 #endif
00051 
00052 #ifdef HAVE_SYS_SOCKET_H
00053 #include <sys/socket.h>
00054 #endif
00055 
00056 #ifdef HAVE_NETINET_IN_H
00057 #include <netinet/in.h>
00058 #endif
00059 
00060 #ifdef HAVE_ARPA_INET_H
00061 #include <arpa/inet.h>
00062 #endif
00063 
00064 #ifdef HAVE_NETDB_H
00065 #include <netdb.h>
00066 #endif
00067 
00068 /*
00069 #ifdef HAVE_ALLOCA_H
00070 #include <alloca.h>
00071 #endif
00072 */
00073 
00074 /* Required. */
00075 #include <pthread.h>
00076 
00077 #ifdef STDC_HEADERS
00078 #include <stdio.h>
00079 #include <stdlib.h>
00080 #include <errno.h>
00081 #endif
00082 
00083 #ifdef HAVE_SYS_TIME_H
00084 # include <sys/time.h>
00085 # ifdef TIME_WITH_SYS_TIME
00086 #  ifdef HAVE_TIME_H
00087 #  include <sys/time.h>
00088 #  endif
00089 # endif
00090 #else
00091 # ifdef HAVE_TIME_H
00092 # include <sys/time.h>
00093 # endif
00094 #endif
00095 
00096 #ifdef HAVE_SYS_SELECT_H
00097 #include <sys/select.h>
00098 #endif
00099 
00100 #include "ks_socket.h"
00101 #include "ks_malloc.h"
00102 
00103 #define MSECS 1000
00104 #define USECS 1000000
00105 #define NSECS 1000000000
00106 
00114 struct _ks_socket_t {
00115         char            *hostname; 
00116         char            *port;     
00117         int                      proto;    
00119         struct sockaddr_in      
00120                                  addr;     
00121         int                      socket;   
00122         pthread_mutex_t
00123                                  m_send;   
00124         pthread_mutex_t
00125                                  m_recv;   
00126         pthread_mutex_t
00127                                  m_cache;  
00128         pthread_cond_t
00129                                  c_cache;  
00131         ks_array_t       cache;    
00132         char            *princ;    
00133         char            *creds;    
00134         /* XXX
00135          * Any results from getaddrinfo,
00136          * Timeouts for those results.
00137          */
00138 };
00139 
00140 void
00141 ks_socket_set_auth(ks_socket_t *s,
00142                                 const char *principal, const char *credentials)
00143 {
00144         int      len;
00145 
00146         if (s->princ) {
00147                 ks_free(s->princ);
00148                 s->princ = NULL;
00149         }
00150 
00151         if (s->creds) {
00152                 ks_free(s->creds);
00153                 s->creds = NULL;
00154         }
00155         
00156         if (principal != NULL) {
00157                 len = strlen(principal);
00158                 s->princ = ks_malloc(len + 1);
00159                 strcpy(s->princ, principal);
00160         }
00161 
00162         if (credentials != NULL) {
00163                 len = strlen(credentials);
00164                 s->creds = ks_malloc(len + 1);
00165                 strcpy(s->creds, credentials);
00166         }
00167 }
00168 
00169 #define QUEUE_SIZE_MAX  64
00170 
00179 static ks_response_t *
00180 ks_socket_cache_get(ks_socket_t *k, ks_string_t *id)
00181 {
00182         ks_response_t   *r;
00183         int                              i;
00184 
00185         /* It's nonobvious, but this isn't the place to do this. */
00186         // pthread_mutex_lock(&(k->m_cache));
00187 
00188         for (i = 0; i < ks_array_length(&(k->cache)); i++) {
00189                 r = (ks_response_t *)ks_array_get(&(k->cache), i);
00190                 if (ks_string_equals(ks_response_id(r), id)) {
00191                         ks_array_remove(&(k->cache), i);
00192                         return r;
00193                 }
00194         }
00195 
00196         // pthread_mutex_unlock(&(k->m_cache));
00197 
00198         return NULL;
00199 }
00200 
00208 static void
00209 ks_socket_cache_add(ks_socket_t *k, ks_response_t *r)
00210 {
00211         /* It's nonobvious, but this isn't the place to do this. */
00212         // pthread_mutex_lock(&(k->m_cache));
00213 
00214         if (ks_array_length(&(k->cache)) >= QUEUE_SIZE_MAX)
00215                 ks_array_remove(&(k->cache), 0);
00216         ks_array_add(&(k->cache), (ks_base_t *)r);
00217 
00218         // pthread_mutex_unlock(&(k->m_cache));
00219 }
00220 
00227 /* 
00228  * Cache the [relevant bits of the] answer?
00229  * Set an expiry time of 3600s for the info.
00230  * This optional, feel free to do it in connect().
00231  */
00232 static int
00233 ks_socket_addr(ks_socket_t *k)
00234 {
00235         struct addrinfo  hh;
00236         struct addrinfo *ad;
00237         struct addrinfo *ap;
00238         int              ss;
00239         memset(&hh, 0, sizeof(struct addrinfo));
00240         hh.ai_family = AF_INET;
00241         if (k->proto == IPPROTO_TCP)
00242                 hh.ai_socktype = SOCK_STREAM;
00243         else if (k->proto == IPPROTO_UDP)
00244                 hh.ai_socktype = SOCK_DGRAM;
00245         else
00246                 hh.ai_socktype = 0;
00247         hh.ai_protocol = k->proto;
00248         hh.ai_flags = AI_ADDRCONFIG;
00249         ss = getaddrinfo(k->hostname, k->port, &hh, &ad);
00250         if (ss != 0) {
00251                 ks_error("getaddrinfo(%s, %s): %s",
00252                                 k->hostname, k->port, gai_strerror(ss));
00253                 return ss;
00254         }
00255         for (ap = ad; ap; ap = ap->ai_next) {
00256                 if (ap->ai_family == AF_INET) {
00257                         memcpy(&(k->addr), ap->ai_addr, ap->ai_addrlen);
00258                         freeaddrinfo(ad);
00259                         return 0;
00260                 }
00261         }
00262         freeaddrinfo(ad);
00263         return 1;
00264 }
00265 
00274 ks_socket_t *
00275 ks_socket_new(const char *hostname, const char *port, int proto)
00276 {
00277         ks_socket_t     *k;
00278 
00279         if (hostname == NULL)
00280                 hostname = KS_SLAVE_HOST_DEFAULT;
00281         if (port == NULL)
00282                 port = KS_SLAVE_PORT_DEFAULT;
00283 
00284         if (proto == IPPROTO_TCP)
00285                 ;
00286         else if (proto == IPPROTO_UDP)
00287                 ;
00288         else {
00289                 ks_error("Unsupported protocol %d\n", proto);
00290                 return NULL;
00291         }
00292 
00293         /* XXX: Unchecked malloc */
00294         k = (ks_socket_t *)ks_malloc(sizeof(ks_socket_t));
00295         memset(k, 0, sizeof(ks_socket_t));
00296         k->hostname = (char *)ks_malloc(strlen(hostname) + 1);
00297         strcpy(k->hostname, hostname);
00298         k->port = (char *)ks_malloc(strlen(port) + 1);
00299         strcpy(k->port, port);
00300         k->proto = proto;
00301         k->socket = -1;
00302         k->princ = NULL;
00303         k->creds = NULL;
00304 
00305         pthread_mutex_init(&(k->m_send), NULL);
00306         pthread_mutex_init(&(k->m_recv), NULL);
00307         pthread_mutex_init(&(k->m_cache), NULL);
00308         pthread_cond_init(&(k->c_cache), NULL);
00309 
00310         ks_array_init(&(k->cache));
00311         return k;
00312 }
00313 
00321 void
00322 ks_socket_free(ks_socket_t *k)
00323 {
00324         ks_socket_close(k);
00325         ks_free(k->hostname);
00326         ks_free(k->port);
00327 
00328         pthread_mutex_destroy(&(k->m_send));
00329         pthread_mutex_destroy(&(k->m_recv));
00330         pthread_mutex_destroy(&(k->m_cache));
00331         pthread_cond_destroy(&(k->c_cache));
00332 
00333         ks_array_fini(&(k->cache));
00334         if (k->princ)
00335                 ks_free(k->princ);
00336         if (k->creds)
00337                 ks_free(k->creds);
00338         ks_free(k);
00339 }
00340 
00349 /* getaddrinfo() in here */
00350 int
00351 ks_socket_connect(ks_socket_t *k)
00352 {
00353         if (k->socket != -1)
00354                 ks_socket_close(k);
00355 
00356         if (ks_socket_addr(k) != 0)
00357                 return -1;
00358         if (k->proto == IPPROTO_TCP) {
00359                 k->socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
00360         }
00361         else if (k->proto == IPPROTO_UDP) {
00362                 k->socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
00363         }
00364         else {
00365                 /* NOTREACHED */
00366                 return -1;
00367         }
00368 
00369         if (k->socket == -1) {
00370                 ks_error("ks_socket_connect: socket: %s", strerror(errno));
00371                 return -1;
00372         }
00373         if (connect(k->socket,
00374                         (struct sockaddr *)&(k->addr),
00375                         sizeof(struct sockaddr_in)) < 0) {
00376                 ks_error("ks_socket_connect: connect: %s", strerror(errno));
00377                 return -1;
00378         }
00379 
00380         return 0;
00381 }
00382 
00389 void
00390 ks_socket_close(ks_socket_t *k)
00391 {
00392         if (k->socket != -1)
00393                 close(k->socket);
00394         k->socket = -1;
00395 }
00396 
00397 static int
00398 ks_socket_send_real(ks_socket_t *k, void *buf, int len) {
00399         int              ret;
00400         int              off = 0;
00401 
00402         do {
00403                 ret = write(k->socket, buf + off, len - off);
00404                 if (ret < 0) {
00405                         ks_error("ks_socket_send: write: %s", strerror(errno));
00406                         ks_socket_close(k);
00407                         return ret;
00408                 }
00409                 off += ret;
00410         } while (off < len);
00411 
00412         return off;
00413 }
00414 
00423 ssize_t
00424 ks_socket_send(ks_socket_t *k, ks_bquery_t *q)
00425 {
00426         void            *buf;
00427         ks_string_t     *r;
00428         uint32_t         len;
00429         ssize_t          ret = -1;
00430 
00431         if (k->socket == -1) {
00432                 ret = ks_socket_connect(k);
00433                 if (ret < 0)
00434                         return ret;
00435         }
00436 
00437         if (k->princ != NULL) {
00438                 int authed = ks_bquery_has_auth(q);
00439 
00440                 if (!authed)
00441                         ks_bquery_set_auth(q, k->princ, k->creds);
00442         }
00443 
00444         /* DO NOT RETURN EARLY - you will forget to free this. */
00445         r = ks_bquery_packet(q);
00446 
00447         pthread_mutex_lock(&(k->m_send));
00448 
00449         if (k->proto == IPPROTO_TCP) {
00450 // #if defined(KS_USE_ALLOCA) && defined(HAVE_ALLOCA)
00451 //              buf = alloca(4 + ks_string_length(r));
00452 // #else
00453                 buf = ks_malloc(4 + ks_string_length(r));
00454 // #endif
00455                 len = htonl(ks_string_length(r));
00456                 memcpy(buf, &len, 4);
00457                 memcpy(buf + 4, ks_string_get(r), ks_string_length(r));
00458                 ret = ks_socket_send_real(k, buf, 4 + ks_string_length(r));
00459                 /* XXX This is the place to try a reconnect. */
00460 // #if !defined(KS_USE_ALLOCA) || !defined(HAVE_ALLOCA)
00461                 ks_free(buf);
00462 // #endif
00463         }
00464         else if (k->proto == IPPROTO_UDP) {
00465                 /* Under Linux, the respecification of the target address
00466                  * is redundant. Under BSD, it returns EISCONN - socket
00467                  * is already connected, and we must pass NULL, 0. */
00468                 ret = sendto(k->socket,
00469                                 ks_string_get(r), ks_string_length(r),
00470                                 0,
00471                                 /* (struct sockaddr*)&(k->addr),
00472                                 sizeof(struct sockaddr_in) */
00473                                 NULL, 0);
00474                 if (ret < 0) {
00475                         ks_error("ks_socket_send: sendto: %s", strerror(errno));
00476                         ks_socket_close(k);
00477                 }
00478         }
00479 
00480         pthread_mutex_unlock(&(k->m_send));
00481 
00482         ks_string_free(r);
00483 
00484         return ret;
00485 }
00486 
00487 static int
00488 ks_socket_tv_subtract(struct timeval *out,
00489                                 const struct timeval *x, struct timeval *y)
00490 {
00491         /* Perform the carry for the later subtraction by updating y. */
00492         if (x->tv_usec < y->tv_usec) {
00493                 int nsec = (y->tv_usec - x->tv_usec) / USECS + 1;
00494                 y->tv_usec -= USECS * nsec;
00495                 y->tv_sec += nsec;
00496         }
00497         if (x->tv_usec - y->tv_usec > USECS) {
00498                 int nsec = (x->tv_usec - y->tv_usec) / USECS;
00499                 y->tv_usec += USECS * nsec;
00500                 y->tv_sec -= nsec;
00501         }
00502 
00503         /* Compute the time remaining to wait.
00504          * tv_usec is certainly positive. */
00505         out->tv_sec = x->tv_sec - y->tv_sec;
00506         out->tv_usec = x->tv_usec - y->tv_usec;
00507 
00508         /* Return 1 if result is negative. */
00509         return x->tv_sec < y->tv_sec;
00510 }
00511 
00521 static ks_response_t *
00522 ks_socket_recv_real(ks_socket_t *k, const struct timeval *end)
00523 {
00524         char                     buffer[RECV_SIZE];
00525         uint32_t                 len;
00526         ssize_t                  ret;
00527         uint32_t                 off;
00528 
00529         struct timeval   now;
00530         struct timeval   timeout;
00531         fd_set                   rfds;
00532 
00533         if (k->socket == -1) {
00534                 ks_error("Cannot receive from closed/invalid socket");
00535                 return NULL;
00536         }
00537 
00538         /* Again, it's nonobvious, but this isn't the place. */
00539         // pthread_mutex_lock(&(k->m_recv));
00540 
00541         {
00542                 gettimeofday(&now, NULL);
00543                 if (ks_socket_tv_subtract(&timeout, end, &now)) {
00544                         ks_error("ks_socket_recv: timeout on socket %d", k->socket);
00545                         return NULL;    /* timeout */
00546                 }
00547                 FD_ZERO(&rfds);
00548                 FD_SET(k->socket, &rfds);
00549                 select(k->socket + 1, &rfds, NULL, NULL, &timeout);
00550                 if (!FD_ISSET(k->socket, &rfds)) {
00551                         ks_error("ks_socket_recv: timeout on socket %d", k->socket);
00552                         return NULL;    /* timeout */
00553                 }
00554         }
00555 
00556         if (k->proto == IPPROTO_TCP) {
00557                 len = 1;
00558                 for (off = 0; off < 4; off += ret) {
00559                         ret = read(k->socket, &(buffer[off]), 4 - off);
00560                         if (ret < 0) {
00561                                 ks_error("ks_socket_recv: read: %s", strerror(errno));
00562                                 ks_socket_close(k);
00563                                 len = 0;
00564                                 break;
00565                         }
00566                 }
00567                 if (len > 0) {
00568                         len = ntohl(*(uint32_t *)buffer);
00569                         if (len > RECV_SIZE || len < 2) {
00570                                 ks_error("Cannot receive packet: bad size %u", len);
00571                                 ks_socket_close(k);
00572                                 len = 0;
00573                         }
00574                         else {
00575                                 for (off = 0; off < len; off += ret) {
00576                                         ret = read(k->socket, &(buffer[off]), len - off);
00577                                         if (ret < 0) {
00578                                                 ks_error("ks_socket_recv: read: %s",
00579                                                                 strerror(errno));
00580                                                 ks_socket_close(k);
00581                                                 len = 0;
00582                                                 break;
00583                                         }
00584                                 }
00585                         }
00586                 }
00587         }
00588         else if (k->proto == IPPROTO_UDP) {
00589                 ret = recvfrom(k->socket, buffer,
00590                                 RECV_SIZE, 0,
00591                                 NULL, NULL);
00592                 if (ret < 0) {
00593                         ks_error("ks_socket_recv: recvfrom: %s", strerror(errno));
00594                         ks_socket_close(k);
00595                         len = 0;
00596                 }
00597                 else {
00598                         len = ret;
00599                 }
00600         }
00601         else {
00602                 len = 0;
00603         }
00604 
00605         // pthread_mutex_unlock(&(k->m_recv));
00606 
00607         if (len == 0)
00608                 return NULL;
00609 
00610         return ks_response_bparse(buffer, len);
00611 }
00612 
00613 /* The simple version:
00614 ks_response_t *
00615 ks_socket_recv(ks_socket_t *k, ks_string_t *id)
00616 {
00617         ks_response_t   *r;
00618 
00619         r = ks_socket_cache_get(k, id);
00620         if (r != NULL)
00621                 return r;
00622 
00623         for (;;) {
00624                 r = ks_socket_recv_real(k);
00625                 if (r == NULL)
00626                         return NULL;
00627                 if (id == NULL)
00628                         return r;
00629                 if (ks_string_equals(ks_response_id(r), id))
00630                         return r;
00631                 else
00632                         ks_socket_cache_add(k, r);
00633         }
00634 }
00635 */
00636 
00646 /* XXX: If using a UDP transport and the packet is lost this will not return */
00647 ks_response_t *
00648 ks_socket_recv(ks_socket_t *k, ks_string_t *id, int timeout)
00649 {
00650         ks_response_t   *r;
00651         ks_string_t             *rid;
00652 
00653         if (timeout <= 0)
00654                 timeout = KS_TIMEOUT_DEFAULT;
00655 
00656         struct timeval   tv;
00657         struct timespec  ts;
00658 
00659         {
00660                 gettimeofday(&tv, NULL);
00661                 tv.tv_usec += timeout * (USECS/MSECS);
00662                 tv.tv_sec += (tv.tv_usec / USECS);
00663                 tv.tv_usec %= USECS;
00664 
00665                 ts.tv_sec = tv.tv_sec;
00666                 ts.tv_nsec = tv.tv_usec * (NSECS/USECS);
00667         }
00668 
00669         pthread_mutex_lock(&(k->m_cache));
00670         for (;;) {
00671                 r = ks_socket_cache_get(k, id);
00672                 if (r != NULL) {
00673                         pthread_mutex_unlock(&(k->m_cache));
00674                         return r;
00675                 }
00676                 /* We hold the lock on the cache here.
00677                  * Therefore, the recv thread cannot add to
00678                  * the cache until we are waiting on the
00679                  * condition. */
00680                 if (pthread_mutex_trylock(&(k->m_recv)) != EBUSY) {
00681                         /* Let other people access the cache while
00682                          * this thread is blocked in recv() */
00683                         pthread_mutex_unlock(&(k->m_cache));
00684                         for (;;) {
00685                                 /* We are the recv thread until we get our
00686                                  * own response. */
00687                                 r = ks_socket_recv_real(k, &tv);
00688                                 if (r == NULL)
00689                                         goto recv_done;
00690                                 rid = ks_response_id(r);
00691                                 if (rid == NULL) {
00692                                         /* This should never happen. But it did. */
00693                                         ks_error("ks_socket_recv: received packet has no id");
00694                                         goto recv_done;
00695                                 }
00696                                 if (ks_string_equals(rid, id))
00697                                         goto recv_done;
00698                                 pthread_mutex_lock(&(k->m_cache));
00699                                 ks_socket_cache_add(k, r);
00700                                 pthread_cond_broadcast(&(k->c_cache));
00701                                 pthread_mutex_unlock(&(k->m_cache));
00702                         }
00703                 }
00704                 if (pthread_cond_timedwait(&(k->c_cache), &(k->m_cache), &ts)
00705                                                 == ETIMEDOUT)
00706                         return NULL;
00707         }
00708 
00709 recv_done:
00710         /* Tell someone else to get their act in gear. */
00711         pthread_mutex_unlock(&(k->m_recv));
00712         pthread_mutex_lock(&(k->m_cache));
00713         pthread_cond_broadcast(&(k->c_cache));
00714         pthread_mutex_unlock(&(k->m_cache));
00715         return r;
00716 }
00717 
00733 ks_response_t *
00734 ks_socket_ask(ks_socket_t *k, ks_bquery_t *q, int timeout)
00735                 
00736 {
00737         ks_response_t   *r;
00738         int                              i;
00739 
00740         if (timeout <= 0)
00741                 timeout = KS_TIMEOUT_DEFAULT;
00742 
00743         /* XXX Consider a retry in TCP mode in case the maximum UDP
00744          * packet size for some network segment was exceeded. */
00745         for (i = 0; i < 3; i++) {
00746                 if (ks_socket_send(k, q) < 0)
00747                         return NULL;
00748                 r = ks_socket_recv(k, ks_bquery_id_get(q), timeout);
00749                 if (r != NULL)
00750                         return r;
00751                 timeout += timeout;
00752         }
00753 
00754         return NULL;
00755 }

Generated on Tue Aug 7 21:11:30 2007 for libkarmaclient by  doxygen 1.4.4