pacemaker  1.1.18-36d2962a86
Scalable High-Availability cluster resource manager
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2008 Andrew Beekhof
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17  *
18  */
19 #include <crm_internal.h>
20 #include <crm/crm.h>
21 
22 #include <sys/param.h>
23 #include <stdio.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
27 #include <sys/socket.h>
28 #include <arpa/inet.h>
29 #include <netinet/in.h>
30 #include <netinet/ip.h>
31 #include <netinet/tcp.h>
32 #include <netdb.h>
33 
34 #include <stdlib.h>
35 #include <errno.h>
36 #include <fcntl.h>
37 #include <glib.h>
38 
39 #include <bzlib.h>
40 
41 #include <crm/common/ipcs.h>
42 #include <crm/common/xml.h>
43 #include <crm/common/mainloop.h>
44 
45 #ifdef HAVE_GNUTLS_GNUTLS_H
46 # undef KEYFILE
47 # include <gnutls/gnutls.h>
48 
49 const int psk_tls_kx_order[] = {
50  GNUTLS_KX_DHE_PSK,
51  GNUTLS_KX_PSK,
52 };
53 
54 const int anon_tls_kx_order[] = {
55  GNUTLS_KX_ANON_DH,
56  GNUTLS_KX_DHE_RSA,
57  GNUTLS_KX_DHE_DSS,
58  GNUTLS_KX_RSA,
59  0
60 };
61 #endif
62 
63 /* Swab macros from linux/swab.h */
64 #ifdef HAVE_LINUX_SWAB_H
65 # include <linux/swab.h>
66 #else
67 /*
68  * casts are necessary for constants, because we never know how for sure
69  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
70  */
71 #define __swab16(x) ((uint16_t)( \
72  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
73  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
74 
75 #define __swab32(x) ((uint32_t)( \
76  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
77  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
78  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
79  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
80 
81 #define __swab64(x) ((uint64_t)( \
82  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
83  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
84  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
85  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
86  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
87  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
88  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
89  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
90 #endif
91 
92 #define REMOTE_MSG_VERSION 1
93 #define ENDIAN_LOCAL 0xBADADBBD
94 
95 struct crm_remote_header_v0
96 {
97  uint32_t endian; /* Detect messages from hosts with different endian-ness */
99  uint64_t id;
100  uint64_t flags;
105 
106  /* New fields get added here */
107 
108 } __attribute__ ((packed));
109 
110 static struct crm_remote_header_v0 *
111 crm_remote_header(crm_remote_t * remote)
112 {
113  struct crm_remote_header_v0 *header = (struct crm_remote_header_v0 *)remote->buffer;
114  if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
115  return NULL;
116 
117  } else if(header->endian != ENDIAN_LOCAL) {
118  uint32_t endian = __swab32(header->endian);
119 
120  CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
121  if(endian != ENDIAN_LOCAL) {
122  crm_err("Invalid message detected, endian mismatch: %lx is neither %lx nor the swab'd %lx",
123  ENDIAN_LOCAL, header->endian, endian);
124  return NULL;
125  }
126 
127  header->id = __swab64(header->id);
128  header->flags = __swab64(header->flags);
129  header->endian = __swab32(header->endian);
130 
131  header->version = __swab32(header->version);
132  header->size_total = __swab32(header->size_total);
133  header->payload_offset = __swab32(header->payload_offset);
134  header->payload_compressed = __swab32(header->payload_compressed);
135  header->payload_uncompressed = __swab32(header->payload_uncompressed);
136  }
137 
138  return header;
139 }
140 
141 #ifdef HAVE_GNUTLS_GNUTLS_H
142 
143 int
144 crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
145 {
146  int rc = 0;
147  int pollrc = 0;
148  time_t start = time(NULL);
149 
150  do {
151  rc = gnutls_handshake(*remote->tls_session);
152  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
153  pollrc = crm_remote_ready(remote, 1000);
154  if (pollrc < 0) {
155  /* poll returned error, there is no hope */
156  rc = -1;
157  }
158  }
160  } while (((time(NULL) - start) < (timeout_ms / 1000)) &&
161  (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
162 
163  if (rc < 0) {
164  crm_trace("gnutls_handshake() failed with %d", rc);
165  }
166  return rc;
167 }
168 
169 void *
170 crm_create_anon_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ ,
171  void *credentials)
172 {
173  gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
174 
175  gnutls_init(session, type);
176 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
177 /* http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication */
178  gnutls_priority_set_direct(*session, "NORMAL:+ANON-DH", NULL);
179 /* gnutls_priority_set_direct (*session, "NONE:+VERS-TLS-ALL:+CIPHER-ALL:+MAC-ALL:+SIGN-ALL:+COMP-ALL:+ANON-DH", NULL); */
180 # else
181  gnutls_set_default_priority(*session);
182  gnutls_kx_set_priority(*session, anon_tls_kx_order);
183 # endif
184  gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
185  switch (type) {
186  case GNUTLS_SERVER:
187  gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
188  (gnutls_anon_server_credentials_t) credentials);
189  break;
190  case GNUTLS_CLIENT:
191  gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
192  (gnutls_anon_client_credentials_t) credentials);
193  break;
194  }
195 
196  return session;
197 }
198 
199 void *
200 create_psk_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ , void *credentials)
201 {
202  gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
203 
204  gnutls_init(session, type);
205 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
206  gnutls_priority_set_direct(*session, "NORMAL:+DHE-PSK:+PSK", NULL);
207 # else
208  gnutls_set_default_priority(*session);
209  gnutls_kx_set_priority(*session, psk_tls_kx_order);
210 # endif
211  gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
212  switch (type) {
213  case GNUTLS_SERVER:
214  gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
215  (gnutls_psk_server_credentials_t) credentials);
216  break;
217  case GNUTLS_CLIENT:
218  gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
219  (gnutls_psk_client_credentials_t) credentials);
220  break;
221  }
222 
223  return session;
224 }
225 
226 static int
227 crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
228 {
229  const char *unsent = buf;
230  int rc = 0;
231  int total_send;
232 
233  if (buf == NULL) {
234  return -1;
235  }
236 
237  total_send = len;
238  crm_trace("Message size: %llu", (unsigned long long) len);
239 
240  while (TRUE) {
241  rc = gnutls_record_send(*session, unsent, len);
242 
243  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
244  crm_trace("Retrying to send %llu bytes",
245  (unsigned long long) len);
246 
247  } else if (rc < 0) {
248  crm_err("Connection terminated: %s " CRM_XS " rc=%d",
249  gnutls_strerror(rc), rc);
250  break;
251 
252  } else if (rc < len) {
253  crm_debug("Sent %d of %llu bytes", rc, (unsigned long long) len);
254  len -= rc;
255  unsent += rc;
256  } else {
257  crm_trace("Sent all %d bytes", rc);
258  break;
259  }
260  }
261 
262  return rc < 0 ? rc : total_send;
263 }
264 #endif
265 
266 static int
267 crm_send_plaintext(int sock, const char *buf, size_t len)
268 {
269 
270  int rc = 0;
271  const char *unsent = buf;
272  int total_send;
273 
274  if (buf == NULL) {
275  return -1;
276  }
277  total_send = len;
278 
279  crm_trace("Message on socket %d: size=%llu",
280  sock, (unsigned long long) len);
281  retry:
282  rc = write(sock, unsent, len);
283  if (rc < 0) {
284  switch (errno) {
285  case EINTR:
286  case EAGAIN:
287  crm_trace("Retry");
288  goto retry;
289  default:
290  crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int)len);
291  break;
292  }
293 
294  } else if (rc < len) {
295  crm_trace("Only sent %d of %llu remaining bytes",
296  rc, (unsigned long long) len);
297  len -= rc;
298  unsent += rc;
299  goto retry;
300 
301  } else {
302  crm_trace("Sent %d bytes: %.100s", rc, buf);
303  }
304 
305  return rc < 0 ? rc : total_send;
306 
307 }
308 
309 static int
310 crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
311 {
312  int lpc = 0;
313  int rc = -ESOCKTNOSUPPORT;
314 
315  for(; lpc < iovs; lpc++) {
316 
317 #ifdef HAVE_GNUTLS_GNUTLS_H
318  if (remote->tls_session) {
319  rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
320  } else if (remote->tcp_socket) {
321 #else
322  if (remote->tcp_socket) {
323 #endif
324  rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
325 
326  } else {
327  crm_err("Unsupported connection type");
328  }
329  }
330  return rc;
331 }
332 
333 int
334 crm_remote_send(crm_remote_t * remote, xmlNode * msg)
335 {
336  int rc = -1;
337  static uint64_t id = 0;
338  char *xml_text = dump_xml_unformatted(msg);
339 
340  struct iovec iov[2];
341  struct crm_remote_header_v0 *header;
342 
343  if (xml_text == NULL) {
344  crm_err("Invalid XML, can not send msg");
345  return -1;
346  }
347 
348  header = calloc(1, sizeof(struct crm_remote_header_v0));
349  iov[0].iov_base = header;
350  iov[0].iov_len = sizeof(struct crm_remote_header_v0);
351 
352  iov[1].iov_base = xml_text;
353  iov[1].iov_len = 1 + strlen(xml_text);
354 
355  id++;
356  header->id = id;
357  header->endian = ENDIAN_LOCAL;
358  header->version = REMOTE_MSG_VERSION;
359  header->payload_offset = iov[0].iov_len;
360  header->payload_uncompressed = iov[1].iov_len;
361  header->size_total = iov[0].iov_len + iov[1].iov_len;
362 
363  crm_trace("Sending len[0]=%d, start=%x",
364  (int)iov[0].iov_len, *(int*)(void*)xml_text);
365  rc = crm_remote_sendv(remote, iov, 2);
366  if (rc < 0) {
367  crm_err("Failed to send remote msg, rc = %d", rc);
368  }
369 
370  free(iov[0].iov_base);
371  free(iov[1].iov_base);
372  return rc;
373 }
374 
375 
381 xmlNode *
383 {
384  xmlNode *xml = NULL;
385  struct crm_remote_header_v0 *header = crm_remote_header(remote);
386 
387  if (remote->buffer == NULL || header == NULL) {
388  return NULL;
389  }
390 
391  /* Support compression on the receiving end now, in case we ever want to add it later */
392  if (header->payload_compressed) {
393  int rc = 0;
394  unsigned int size_u = 1 + header->payload_uncompressed;
395  char *uncompressed = calloc(1, header->payload_offset + size_u);
396 
397  crm_trace("Decompressing message data %d bytes into %d bytes",
398  header->payload_compressed, size_u);
399 
400  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
401  remote->buffer + header->payload_offset,
402  header->payload_compressed, 1, 0);
403 
404  if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
405  crm_warn("Couldn't decompress v%d message, we only understand v%d",
406  header->version, REMOTE_MSG_VERSION);
407  free(uncompressed);
408  return NULL;
409 
410  } else if (rc != BZ_OK) {
411  crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
412  free(uncompressed);
413  return NULL;
414  }
415 
416  CRM_ASSERT(size_u == header->payload_uncompressed);
417 
418  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
419  remote->buffer_size = header->payload_offset + size_u;
420 
421  free(remote->buffer);
422  remote->buffer = uncompressed;
423  header = crm_remote_header(remote);
424  }
425 
426  /* take ownership of the buffer */
427  remote->buffer_offset = 0;
428 
429  CRM_LOG_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
430 
431  xml = string2xml(remote->buffer + header->payload_offset);
432  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
433  crm_warn("Couldn't parse v%d message, we only understand v%d",
434  header->version, REMOTE_MSG_VERSION);
435 
436  } else if (xml == NULL) {
437  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
438  }
439 
440  return xml;
441 }
442 
452 int
453 crm_remote_ready(crm_remote_t *remote, int total_timeout)
454 {
455  struct pollfd fds = { 0, };
456  int sock = 0;
457  int rc = 0;
458  time_t start;
459  int timeout = total_timeout;
460 
461 #ifdef HAVE_GNUTLS_GNUTLS_H
462  if (remote->tls_session) {
463  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
464 
465  sock = GPOINTER_TO_INT(sock_ptr);
466  } else if (remote->tcp_socket) {
467 #else
468  if (remote->tcp_socket) {
469 #endif
470  sock = remote->tcp_socket;
471  } else {
472  crm_err("Unsupported connection type");
473  }
474 
475  if (sock <= 0) {
476  crm_trace("No longer connected");
477  return -ENOTCONN;
478  }
479 
480  start = time(NULL);
481  errno = 0;
482  do {
483  fds.fd = sock;
484  fds.events = POLLIN;
485 
486  /* If we got an EINTR while polling, and we have a
487  * specific timeout we are trying to honor, attempt
488  * to adjust the timeout to the closest second. */
489  if (errno == EINTR && (timeout > 0)) {
490  timeout = total_timeout - ((time(NULL) - start) * 1000);
491  if (timeout < 1000) {
492  timeout = 1000;
493  }
494  }
495 
496  rc = poll(&fds, 1, timeout);
497  } while (rc < 0 && errno == EINTR);
498 
499  return (rc < 0)? -errno : rc;
500 }
501 
502 
513 static size_t
514 crm_remote_recv_once(crm_remote_t * remote)
515 {
516  int rc = 0;
517  size_t read_len = sizeof(struct crm_remote_header_v0);
518  struct crm_remote_header_v0 *header = crm_remote_header(remote);
519 
520  if(header) {
521  /* Stop at the end of the current message */
522  read_len = header->size_total;
523  }
524 
525  /* automatically grow the buffer when needed */
526  if(remote->buffer_size < read_len) {
527  remote->buffer_size = 2 * read_len;
528  crm_trace("Expanding buffer to %llu bytes",
529  (unsigned long long) remote->buffer_size);
530 
531  remote->buffer = realloc_safe(remote->buffer, remote->buffer_size + 1);
532  CRM_ASSERT(remote->buffer != NULL);
533  }
534 
535 #ifdef HAVE_GNUTLS_GNUTLS_H
536  if (remote->tls_session) {
537  rc = gnutls_record_recv(*(remote->tls_session),
538  remote->buffer + remote->buffer_offset,
539  remote->buffer_size - remote->buffer_offset);
540  if (rc == GNUTLS_E_INTERRUPTED) {
541  rc = -EINTR;
542  } else if (rc == GNUTLS_E_AGAIN) {
543  rc = -EAGAIN;
544  } else if (rc < 0) {
545  crm_debug("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
546  rc = -pcmk_err_generic;
547  }
548  } else if (remote->tcp_socket) {
549 #else
550  if (remote->tcp_socket) {
551 #endif
552  errno = 0;
553  rc = read(remote->tcp_socket,
554  remote->buffer + remote->buffer_offset,
555  remote->buffer_size - remote->buffer_offset);
556  if(rc < 0) {
557  rc = -errno;
558  }
559 
560  } else {
561  crm_err("Unsupported connection type");
562  return -ESOCKTNOSUPPORT;
563  }
564 
565  /* process any errors. */
566  if (rc > 0) {
567  remote->buffer_offset += rc;
568  /* always null terminate buffer, the +1 to alloc always allows for this. */
569  remote->buffer[remote->buffer_offset] = '\0';
570  crm_trace("Received %u more bytes, %llu total",
571  rc, (unsigned long long) remote->buffer_offset);
572 
573  } else if (rc == -EINTR || rc == -EAGAIN) {
574  crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
575 
576  } else if (rc == 0) {
577  crm_debug("EOF encoutered after %llu bytes",
578  (unsigned long long) remote->buffer_offset);
579  return -ENOTCONN;
580 
581  } else {
582  crm_debug("Error receiving message after %llu bytes: %s (%d)",
583  (unsigned long long) remote->buffer_offset,
584  pcmk_strerror(rc), rc);
585  return -ENOTCONN;
586  }
587 
588  header = crm_remote_header(remote);
589  if(header) {
590  if(remote->buffer_offset < header->size_total) {
591  crm_trace("Read less than the advertised length: %llu < %u bytes",
592  (unsigned long long) remote->buffer_offset,
593  header->size_total);
594  } else {
595  crm_trace("Read full message of %llu bytes",
596  (unsigned long long) remote->buffer_offset);
597  return remote->buffer_offset;
598  }
599  }
600 
601  return -EAGAIN;
602 }
603 
614 gboolean
615 crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
616 {
617  int rc;
618  time_t start = time(NULL);
619  int remaining_timeout = 0;
620 
621  if (total_timeout == 0) {
622  total_timeout = 10000;
623  } else if (total_timeout < 0) {
624  total_timeout = 60000;
625  }
626  *disconnected = 0;
627 
628  remaining_timeout = total_timeout;
629  while ((remaining_timeout > 0) && !(*disconnected)) {
630 
631  crm_trace("Waiting for remote data (%d of %d ms timeout remaining)",
632  remaining_timeout, total_timeout);
633  rc = crm_remote_ready(remote, remaining_timeout);
634 
635  if (rc == 0) {
636  crm_err("Timed out (%d ms) while waiting for remote data",
637  remaining_timeout);
638  return FALSE;
639 
640  } else if (rc < 0) {
641  crm_debug("Wait for remote data aborted, will try again: %s "
642  CRM_XS " rc=%d", pcmk_strerror(rc), rc);
643 
644  } else {
645  rc = crm_remote_recv_once(remote);
646  if (rc > 0) {
647  return TRUE;
648  } else if (rc == -EAGAIN) {
649  crm_trace("Still waiting for remote data");
650  } else if (rc < 0) {
651  crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
652  pcmk_strerror(rc), rc);
653  }
654  }
655 
656  if (rc == -ENOTCONN) {
657  *disconnected = 1;
658  return FALSE;
659  }
660 
661  remaining_timeout = total_timeout - ((time(NULL) - start) * 1000);
662  }
663 
664  return FALSE;
665 }
666 
667 struct tcp_async_cb_data {
668  gboolean success;
669  int sock;
670  void *userdata;
671  void (*callback) (void *userdata, int sock);
672  int timeout; /*ms */
673  time_t start;
674 };
675 
676 static gboolean
677 check_connect_finished(gpointer userdata)
678 {
679  struct tcp_async_cb_data *cb_data = userdata;
680  int rc = 0;
681  int sock = cb_data->sock;
682  int error = 0;
683 
684  fd_set rset, wset;
685  socklen_t len = sizeof(error);
686  struct timeval ts = { 0, };
687 
688  if (cb_data->success == TRUE) {
689  goto dispatch_done;
690  }
691 
692  FD_ZERO(&rset);
693  FD_SET(sock, &rset);
694  wset = rset;
695 
696  crm_trace("fd %d: checking to see if connect finished", sock);
697  rc = select(sock + 1, &rset, &wset, NULL, &ts);
698 
699  if (rc < 0) {
700  rc = errno;
701  if ((errno == EINPROGRESS) || (errno == EAGAIN)) {
702  /* reschedule if there is still time left */
703  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
704  goto reschedule;
705  } else {
706  rc = -ETIMEDOUT;
707  }
708  }
709  crm_trace("fd %d: select failed %d connect dispatch ", sock, rc);
710  goto dispatch_done;
711  } else if (rc == 0) {
712  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
713  goto reschedule;
714  }
715  crm_debug("fd %d: timeout during select", sock);
716  rc = -ETIMEDOUT;
717  goto dispatch_done;
718  } else {
719  crm_trace("fd %d: select returned success", sock);
720  rc = 0;
721  }
722 
723  /* can we read or write to the socket now? */
724  if (FD_ISSET(sock, &rset) || FD_ISSET(sock, &wset)) {
725  if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
726  crm_trace("fd %d: call to getsockopt failed", sock);
727  rc = -1;
728  goto dispatch_done;
729  }
730 
731  if (error) {
732  crm_trace("fd %d: error returned from getsockopt: %d", sock, error);
733  rc = -1;
734  goto dispatch_done;
735  }
736  } else {
737  crm_trace("neither read nor write set after select");
738  rc = -1;
739  goto dispatch_done;
740  }
741 
742  dispatch_done:
743  if (!rc) {
744  crm_trace("fd %d: connected", sock);
745  /* Success, set the return code to the sock to report to the callback */
746  rc = cb_data->sock;
747  cb_data->sock = 0;
748  } else {
749  close(sock);
750  }
751 
752  if (cb_data->callback) {
753  cb_data->callback(cb_data->userdata, rc);
754  }
755  free(cb_data);
756  return FALSE;
757 
758  reschedule:
759 
760  /* will check again next interval */
761  return TRUE;
762 }
763 
764 static int
765 internal_tcp_connect_async(int sock,
766  const struct sockaddr *addr, socklen_t addrlen, int timeout /* ms */ ,
767  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
768 {
769  int rc = 0;
770  int flag = 0;
771  int interval = 500;
772  int timer;
773  struct tcp_async_cb_data *cb_data = NULL;
774 
775  if ((flag = fcntl(sock, F_GETFL)) >= 0) {
776  if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
777  crm_err("fcntl() write failed");
778  return -1;
779  }
780  }
781 
782  rc = connect(sock, addr, addrlen);
783 
784  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
785  return -1;
786  }
787 
788  cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
789  cb_data->userdata = userdata;
790  cb_data->callback = callback;
791  cb_data->sock = sock;
792  cb_data->timeout = timeout;
793  cb_data->start = time(NULL);
794 
795  if (rc == 0) {
796  /* The connect was successful immediately, we still return to mainloop
797  * and let this callback get called later. This avoids the user of this api
798  * to have to account for the fact the callback could be invoked within this
799  * function before returning. */
800  cb_data->success = TRUE;
801  interval = 1;
802  }
803 
804  /* Check connect finished is mostly doing a non-block poll on the socket
805  * to see if we can read/write to it. Once we can, the connect has completed.
806  * This method allows us to connect to the server without blocking mainloop.
807  *
808  * This is a poor man's way of polling to see when the connection finished.
809  * At some point we should figure out a way to use a mainloop fd callback for this.
810  * Something about the way mainloop is currently polling prevents this from working at the
811  * moment though. */
812  crm_trace("fd %d: scheduling to check if connect finished in %dms second", sock, interval);
813  timer = g_timeout_add(interval, check_connect_finished, cb_data);
814  if (timer_id) {
815  *timer_id = timer;
816  }
817 
818  return 0;
819 }
820 
821 static int
822 internal_tcp_connect(int sock, const struct sockaddr *addr, socklen_t addrlen)
823 {
824  int flag = 0;
825  int rc = connect(sock, addr, addrlen);
826 
827  if (rc == 0) {
828  if ((flag = fcntl(sock, F_GETFL)) >= 0) {
829  if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
830  crm_err("fcntl() write failed");
831  return -1;
832  }
833  }
834  }
835 
836  return rc;
837 }
838 
845 int
846 crm_remote_tcp_connect_async(const char *host, int port, int timeout, /*ms */
847  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
848 {
849  char buffer[INET6_ADDRSTRLEN];
850  struct addrinfo *res = NULL;
851  struct addrinfo *rp = NULL;
852  struct addrinfo hints;
853  const char *server = host;
854  int ret_ga;
855  int sock = -1;
856 
857  /* getaddrinfo */
858  memset(&hints, 0, sizeof(struct addrinfo));
859  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
860  hints.ai_socktype = SOCK_STREAM;
861  hints.ai_flags = AI_CANONNAME;
862 
863  crm_debug("Looking up %s", server);
864  ret_ga = getaddrinfo(server, NULL, &hints, &res);
865  if (ret_ga) {
866  crm_err("getaddrinfo: %s", gai_strerror(ret_ga));
867  return -1;
868  }
869 
870  if (!res || !res->ai_addr) {
871  crm_err("getaddrinfo failed");
872  goto async_cleanup;
873  }
874 
875  for (rp = res; rp != NULL; rp = rp->ai_next) {
876  struct sockaddr *addr = rp->ai_addr;
877 
878  if (!addr) {
879  continue;
880  }
881 
882  if (rp->ai_canonname) {
883  server = res->ai_canonname;
884  }
885  crm_debug("Got address %s for %s", server, host);
886 
887  /* create socket */
888  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
889  if (sock == -1) {
890  crm_err("Socket creation failed for remote client connection.");
891  continue;
892  }
893 
894  /* Set port appropriately for address family */
895  /* (void*) casts avoid false-positive compiler alignment warnings */
896  if (addr->sa_family == AF_INET6) {
897  ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
898  } else {
899  ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
900  }
901 
902  memset(buffer, 0, DIMOF(buffer));
903  crm_sockaddr2str(addr, buffer);
904  crm_info("Attempting to connect to remote server at %s:%d", buffer, port);
905 
906  if (callback) {
907  if (internal_tcp_connect_async
908  (sock, rp->ai_addr, rp->ai_addrlen, timeout, timer_id, userdata, callback) == 0) {
909  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
910  }
911 
912  } else {
913  if (internal_tcp_connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
914  break; /* Success */
915  }
916  }
917 
918  close(sock);
919  sock = -1;
920  }
921 
922 async_cleanup:
923 
924  if (res) {
925  freeaddrinfo(res);
926  }
927  return sock;
928 }
929 
930 int
931 crm_remote_tcp_connect(const char *host, int port)
932 {
933  return crm_remote_tcp_connect_async(host, port, -1, NULL, NULL, NULL);
934 }
935 
946 void
947 crm_sockaddr2str(void *sa, char *s)
948 {
949  switch (((struct sockaddr*)sa)->sa_family) {
950  case AF_INET:
951  inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
952  s, INET6_ADDRSTRLEN);
953  break;
954 
955  case AF_INET6:
956  inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
957  s, INET6_ADDRSTRLEN);
958  break;
959 
960  default:
961  strcpy(s, "<invalid>");
962  }
963 }
964 
965 int
967 {
968  int csock = 0;
969  int rc = 0;
970  int flag = 0;
971  unsigned laddr = 0;
972  struct sockaddr_storage addr;
973  char addr_str[INET6_ADDRSTRLEN];
974 #ifdef TCP_USER_TIMEOUT
975  int optval;
976  long sbd_timeout = crm_get_sbd_timeout();
977 #endif
978 
979  /* accept the connection */
980  laddr = sizeof(addr);
981  memset(&addr, 0, sizeof(addr));
982  csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
983  crm_sockaddr2str(&addr, addr_str);
984  crm_info("New remote connection from %s", addr_str);
985 
986  if (csock == -1) {
987  crm_err("accept socket failed");
988  return -1;
989  }
990 
991  if ((flag = fcntl(csock, F_GETFL)) >= 0) {
992  if ((rc = fcntl(csock, F_SETFL, flag | O_NONBLOCK)) < 0) {
993  crm_err("fcntl() write failed");
994  close(csock);
995  return rc;
996  }
997  } else {
998  crm_err("fcntl() read failed");
999  close(csock);
1000  return flag;
1001  }
1002 
1003 #ifdef TCP_USER_TIMEOUT
1004  if (sbd_timeout > 0) {
1005  optval = sbd_timeout / 2; /* time to fail and retry before watchdog */
1006  rc = setsockopt(csock, SOL_TCP, TCP_USER_TIMEOUT,
1007  &optval, sizeof(optval));
1008  if (rc < 0) {
1009  crm_err("setting TCP_USER_TIMEOUT (%d) on client socket failed",
1010  optval);
1011  close(csock);
1012  return rc;
1013  }
1014  }
1015 #endif
1016 
1017  return csock;
1018 }
1019 
1025 int
1027 {
1028  static int port = 0;
1029 
1030  if (port == 0) {
1031  const char *env = getenv("PCMK_remote_port");
1032 
1033  if (env) {
1034  errno = 0;
1035  port = strtol(env, NULL, 10);
1036  if (errno || (port < 1) || (port > 65535)) {
1037  crm_warn("Environment variable PCMK_remote_port has invalid value '%s', using %d instead",
1038  env, DEFAULT_REMOTE_PORT);
1039  port = DEFAULT_REMOTE_PORT;
1040  }
1041  } else {
1042  port = DEFAULT_REMOTE_PORT;
1043  }
1044  }
1045  return port;
1046 }
A dumping ground.
size_t buffer_offset
Definition: ipcs.h:46
long crm_get_sbd_timeout(void)
Definition: watchdog.c:246
uint32_t payload_compressed
Definition: remote.c:159
const char * pcmk_strerror(int rc)
Definition: logging.c:1135
char * buffer
Definition: ipcs.h:44
AIS_Host host
Definition: internal.h:52
uint32_t payload_uncompressed
Definition: remote.c:160
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:150
#define ENDIAN_LOCAL
Definition: remote.c:93
int crm_default_remote_port()
Get the default remote connection TCP port on this host.
Definition: remote.c:1026
Wrappers for and extensions to glib mainloop.
uint32_t endian
Definition: remote.c:153
xmlNode * string2xml(const char *input)
Definition: xml.c:2750
int crm_remote_send(crm_remote_t *remote, xmlNode *msg)
Definition: remote.c:334
#define DEFAULT_REMOTE_PORT
Definition: lrmd.h:54
int crm_remote_tcp_connect(const char *host, int port)
Definition: remote.c:931
#define crm_warn(fmt, args...)
Definition: logging.h:249
#define crm_debug(fmt, args...)
Definition: logging.h:253
int crm_initiate_client_tls_handshake(crm_remote_t *remote, int timeout_ms)
void crm_sockaddr2str(void *sa, char *s)
Convert an IP address (IPv4 or IPv6) to a string for logging.
Definition: remote.c:947
void gnutls_session_t
Definition: cib_remote.c:52
int crm_remote_accept(int ssock)
Definition: remote.c:966
#define crm_trace(fmt, args...)
Definition: logging.h:254
uint64_t id
Definition: remote.c:155
int crm_remote_ready(crm_remote_t *remote, int total_timeout)
Definition: remote.c:453
#define __swab64(x)
Definition: remote.c:81
Wrappers for and extensions to libxml2.
#define pcmk_err_generic
Definition: error.h:45
uint32_t payload_offset
Definition: remote.c:158
struct tcp_async_cb_data __attribute__
uint32_t size_total
Definition: remote.c:157
#define CRM_XS
Definition: logging.h:42
#define __swab32(x)
Definition: remote.c:75
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:226
size_t buffer_size
Definition: ipcs.h:45
#define REMOTE_MSG_VERSION
Definition: remote.c:92
#define crm_err(fmt, args...)
Definition: logging.h:248
const char * bz2_strerror(int rc)
Definition: logging.c:1198
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3825
#define DIMOF(a)
Definition: crm.h:39
xmlNode * crm_remote_parse_buffer(crm_remote_t *remote)
Definition: remote.c:382
#define uint32_t
Definition: stdint.in.h:158
#define CRM_ASSERT(expr)
Definition: error.h:35
void * create_psk_tls_session(int csock, int type, void *credentials)
void * crm_create_anon_tls_session(int sock, int type, void *credentials)
int tcp_socket
Definition: ipcs.h:48
int crm_remote_tcp_connect_async(const char *host, int port, int timeout, int *timer_id, void *userdata, void(*callback)(void *userdata, int sock))
Definition: remote.c:846
#define crm_info(fmt, args...)
Definition: logging.h:251
gboolean crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
Definition: remote.c:615
uint32_t version
Definition: remote.c:154
uint64_t flags
Definition: remote.c:156
enum crm_ais_msg_types type
Definition: internal.h:51