Asynchronous sockets work !

Still need a bit of tuning but it's stable enough
for our current needs
This commit is contained in:
Aris Adamantiadis
2009-12-01 23:34:55 +01:00
parent 7962029bdc
commit 4924ac8099
3 changed files with 55 additions and 15 deletions

View File

@@ -119,14 +119,15 @@ struct ssh_socket_callbacks_struct {
}; };
typedef struct ssh_socket_callbacks_struct *ssh_socket_callbacks; typedef struct ssh_socket_callbacks_struct *ssh_socket_callbacks;
#define SSH_SOCKET_FLOW_WRITEWILLBLOCK (1<<0) #define SSH_SOCKET_FLOW_WRITEWILLBLOCK 1
#define SSH_SOCKET_FLOW_WRITEWONTBLOCK (1<<1) #define SSH_SOCKET_FLOW_WRITEWONTBLOCK 2
#define SSH_SOCKET_EXCEPTION_EOF (1<<0)
#define SSH_SOCKET_EXCEPTION_ERROR (1<<1)
#define SSH_SOCKET_CONNECTED_OK (1<<0) #define SSH_SOCKET_EXCEPTION_EOF 1
#define SSH_SOCKET_CONNECTED_ERROR (1<<1) #define SSH_SOCKET_EXCEPTION_ERROR 2
#define SSH_SOCKET_CONNECTED_TIMEOUT (1<<2)
#define SSH_SOCKET_CONNECTED_OK 1
#define SSH_SOCKET_CONNECTED_ERROR 2
#define SSH_SOCKET_CONNECTED_TIMEOUT 3
/** Initializes an ssh_callbacks_struct /** Initializes an ssh_callbacks_struct
* A call to this macro is mandatory when you have set a new * A call to this macro is mandatory when you have set a new

View File

@@ -135,9 +135,24 @@ int ssh_socket_pollcallback(ssh_poll_handle p, int fd, int revents, void *v_s){
struct socket *s=(struct socket *)v_s; struct socket *s=(struct socket *)v_s;
char buffer[4096]; char buffer[4096];
int r,w; int r,w;
(void)fd; int err=0;
socklen_t errlen=sizeof(err);
if(revents & POLLERR){ if(revents & POLLERR){
s->data_except=1; /* Check if we are in a connecting state */
if(s->state==SSH_SOCKET_CONNECTING){
s->state=SSH_SOCKET_ERROR;
ssh_poll_free(p);
s->poll=p=NULL;
getsockopt(fd,SOL_SOCKET,SO_ERROR,(void *)&err,&errlen);
s->last_errno=err;
close(fd);
s->fd=-1;
if(s->callbacks && s->callbacks->connected)
s->callbacks->connected(SSH_SOCKET_CONNECTED_ERROR,err,
s->callbacks->user);
return 0;
}
/* Then we are in a more standard kind of error */
/* force a read to get an explanation */ /* force a read to get an explanation */
revents |= POLLIN; revents |= POLLIN;
} }
@@ -145,7 +160,8 @@ int ssh_socket_pollcallback(ssh_poll_handle p, int fd, int revents, void *v_s){
s->data_to_read=1; s->data_to_read=1;
r=ssh_socket_unbuffered_read(s,buffer,sizeof(buffer)); r=ssh_socket_unbuffered_read(s,buffer,sizeof(buffer));
if(r<0){ if(r<0){
ssh_poll_set_events(p,ssh_poll_get_events(p) & ~POLLIN); if(p != NULL)
ssh_poll_set_events(p,ssh_poll_get_events(p) & ~POLLIN);
if(s->callbacks){ if(s->callbacks){
s->callbacks->exception( s->callbacks->exception(
SSH_SOCKET_EXCEPTION_ERROR, SSH_SOCKET_EXCEPTION_ERROR,
@@ -187,6 +203,8 @@ int ssh_socket_pollcallback(ssh_poll_handle p, int fd, int revents, void *v_s){
if(buffer_get_rest_len(s->out_buffer) > 0){ if(buffer_get_rest_len(s->out_buffer) > 0){
w=ssh_socket_unbuffered_write(s, buffer_get_rest(s->out_buffer), w=ssh_socket_unbuffered_write(s, buffer_get_rest(s->out_buffer),
buffer_get_rest_len(s->out_buffer)); buffer_get_rest_len(s->out_buffer));
if(w>0)
buffer_pass_bytes(s->out_buffer,w);
} else if(s->callbacks){ } else if(s->callbacks){
/* Otherwise advertise the upper level that write can be done */ /* Otherwise advertise the upper level that write can be done */
s->callbacks->controlflow(SSH_SOCKET_FLOW_WRITEWONTBLOCK,s->callbacks->user); s->callbacks->controlflow(SSH_SOCKET_FLOW_WRITEWONTBLOCK,s->callbacks->user);
@@ -643,6 +661,7 @@ int ssh_socket_nonblocking_flush(struct socket *s) {
session->alive = 0; session->alive = 0;
ssh_socket_close(s); ssh_socket_close(s);
/* FIXME use ssh_socket_get_errno() */ /* FIXME use ssh_socket_get_errno() */
/* FIXME use callback for errors */
ssh_set_error(session, SSH_FATAL, ssh_set_error(session, SSH_FATAL,
"Writing packet: error on socket (or connection closed): %s", "Writing packet: error on socket (or connection closed): %s",
strerror(s->last_errno)); strerror(s->last_errno));
@@ -752,7 +771,13 @@ int ssh_socket_get_status(struct socket *s) {
* @brief Launches a socket connection * @brief Launches a socket connection
* If a the socket connected callback has been defined and * If a the socket connected callback has been defined and
* a poll object exists, this call will be non blocking * a poll object exists, this call will be non blocking
* @param * @param host hostname or ip address to connect to
* @param port port number to connect to
* @param bind_addr address to bind to, or NULL for default
* @returns SSH_OK socket is being connected
* @returns SSH_ERROR error while connecting to remote host
* @bug It only tries connecting to one of the available AI's
* which is problematic for hosts having DNS fail-over
*/ */
int ssh_socket_connect(struct socket *s, const char *host, int port, const char *bind_addr){ int ssh_socket_connect(struct socket *s, const char *host, int port, const char *bind_addr){
@@ -764,6 +789,8 @@ int ssh_socket_connect(struct socket *s, const char *host, int port, const char
return SSH_ERROR; return SSH_ERROR;
fd=ssh_connect_host_nonblocking(s->session,host,bind_addr,port); fd=ssh_connect_host_nonblocking(s->session,host,bind_addr,port);
ssh_log(session,SSH_LOG_PROTOCOL,"Nonblocking connection socket: %d",fd); ssh_log(session,SSH_LOG_PROTOCOL,"Nonblocking connection socket: %d",fd);
if(fd < 0)
return SSH_ERROR;
ssh_socket_set_fd(s,fd); ssh_socket_set_fd(s,fd);
s->state=SSH_SOCKET_CONNECTING; s->state=SSH_SOCKET_CONNECTING;
if(s->callbacks && s->callbacks->connected && s->poll){ if(s->callbacks && s->callbacks->connected && s->poll){

View File

@@ -30,10 +30,15 @@
#include <libssh/socket.h> #include <libssh/socket.h>
#include <libssh/poll.h> #include <libssh/poll.h>
int stop=0;
struct socket *s;
static int data_rcv(const void *data, size_t len, void *user){ static int data_rcv(const void *data, size_t len, void *user){
printf("Received data: '"); printf("Received data: '");
fwrite(data,1,len,stdout); fwrite(data,1,len,stdout);
printf("'\n"); printf("'\n");
ssh_socket_write(s,"Hello you !\n",12);
ssh_socket_nonblocking_flush(s);
return len; return len;
} }
@@ -43,10 +48,16 @@ static void controlflow(int code,void *user){
static void exception(int code, int errno_code,void *user){ static void exception(int code, int errno_code,void *user){
printf("Exception: %d (%d)\n",code,errno_code); printf("Exception: %d (%d)\n",code,errno_code);
stop=1;
} }
static void connected(int code, int errno_code,void *user){ static void connected(int code, int errno_code,void *user){
printf("Connected: %d (%d)\n",code, errno_code); if(code == SSH_SOCKET_CONNECTED_OK)
printf("Connected: %d (%d)\n",code, errno_code);
else {
printf("Error while connecting:(%d, %d:%s)\n",code,errno_code,strerror(errno_code));
stop=1;
}
} }
struct ssh_socket_callbacks_struct callbacks={ struct ssh_socket_callbacks_struct callbacks={
@@ -57,7 +68,6 @@ struct ssh_socket_callbacks_struct callbacks={
NULL NULL
}; };
int main(int argc, char **argv){ int main(int argc, char **argv){
struct socket *s;
ssh_session session; ssh_session session;
ssh_poll_ctx ctx; ssh_poll_ctx ctx;
int verbosity=SSH_LOG_FUNCTIONS; int verbosity=SSH_LOG_FUNCTIONS;
@@ -72,10 +82,12 @@ int main(int argc, char **argv){
ctx=ssh_poll_ctx_new(2); ctx=ssh_poll_ctx_new(2);
ssh_socket_set_callbacks(s, &callbacks); ssh_socket_set_callbacks(s, &callbacks);
ssh_poll_ctx_add_socket(ctx,s); ssh_poll_ctx_add_socket(ctx,s);
if(ssh_socket_connect(s,argv[1],atoi(argv[2]),NULL)){ if(ssh_socket_connect(s,argv[1],atoi(argv[2]),NULL) != SSH_OK){
printf("ssh_socket_connect: %s\n",ssh_get_error(session)); printf("ssh_socket_connect: %s\n",ssh_get_error(session));
return EXIT_FAILURE; return EXIT_FAILURE;
} }
ssh_poll_ctx_dopoll(ctx,-1); while(!stop)
ssh_poll_ctx_dopoll(ctx,-1);
printf("finished\n");
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }