La struttura base è formata dalla creazione/avvio del server e dalla funzione 'gest' di gestione della request. La funzione riceve un dizionario 'environ' con le variabili di ambiente del server, e una funzione 'start_response' da chiamare per iniziare la risposta. Infine deve ritornare una sequenza (lista o tupla) con i dati da inviare al browser.
In questi esempi il server va chiamato con:
http://localhost:8888
# -*- coding: utf-8 -*- #------------------------------------------------------------------------------ from wsgiref.simple_server import make_server import sys #------------------------------------------------------------------------------ PY3 = sys.version_info[0] > 2 ADDR, PORT = "localhost", 8888 # Indirizzo a cui risponde il server ENCODING = "utf-8" #------------------------------------------------------------------------------ # Gestore della richiesta HTTP/WSGI #------------------------------------------------------------------------------ def gest(environ, start_response): page = "<html><body>~~~Hello WEB!~~~</body></html>" # Testo HTML if PY3: page = page.encode(ENCODING) status = "200 OK" headers = [ ("Content-type", "text/html"), ("Content-Length", str(len(page))) ] start_response(status, headers) return [page] #------------------------------------------------------------------------------ my_server = make_server(ADDR, PORT, gest) # Crea oggetto server my_server.serve_forever() # Avvia il server, se lanciato da # terminale interrompere con CTRL+C
Le variabili environ più interessanti sono REQUEST_METHOD, PATH_INFO e QUERY_STRING, che permettono di determinare cosa viene richiesto dal browser.
# -*- coding: utf-8 -*- #------------------------------------------------------------------------------ from wsgiref.simple_server import make_server import sys #------------------------------------------------------------------------------ PY3 = sys.version_info[0] > 2 ADDR, PORT = "localhost", 8888 # Indirizzo a cui risponde il server ENCODING = "utf-8" #------------------------------------------------------------------------------ HTML = ''' <html> <head> <meta http-equiv="content-type" content="text/html; charset=UTF-8"> </head> <body style="font-size:20px; margin:64px; background-color: rgb(200,200,200)"> <span style="color: red; font-weight:bold"> <u>VARIABILI ENVIRON</u> </span> <br><br> %s </body> </html> ''' #------------------------------------------------------------------------------ # Gestore della richiesta HTTP/WSGI #------------------------------------------------------------------------------ def gest(environ, start_response): environ_list = [] for k, v in environ.items(): environ_list.append("<b>%s:</b> %s" % (k, v)) page = HTML % "<br>\n".join(sorted(environ_list)) if PY3: page = page.encode(ENCODING) status = "200 OK" headers = [ ("Content-type", "text/html"), ("Content-Length", str(len(page))) ] start_response(status, headers) return [page] #------------------------------------------------------------------------------ my_server = make_server(ADDR, PORT, gest) # Crea oggetto server my_server.serve_forever() # Avvia il server
Da QUERY_STRING si può ottenere un dizionario con le variabili passate dal browser attraverso l'URL (REQUEST_METHOD di tipo GET).
Ad esempio il seguente URL produce le variabili 'a' 'b' e 'c':
http://localhost:8888/?a=12&b=popp&c=millions
# -*- coding: utf-8 -*- #------------------------------------------------------------------------------ from wsgiref.simple_server import make_server from cgi import parse_qs, escape import sys #------------------------------------------------------------------------------ PY3 = sys.version_info[0] > 2 ADDR, PORT = "localhost", 8888 # Indirizzo a cui risponde il server ENCODING = "utf-8" #------------------------------------------------------------------------------ HTML = ''' <html> <head> <meta http-equiv="content-type" content="text/html; charset=UTF-8"> </head> <body style="font-size:20px; margin:64px; background-color: rgb(200,200,200)"> <span style="color: red; font-weight:bold"> <u>VARIABILI QUERY_STRING</u> </span> <br><br> %s </body> </html> ''' #------------------------------------------------------------------------------ # Gestore della richiesta HTTP/WSGI #------------------------------------------------------------------------------ def gest(environ, start_response): query_dict = parse_qs(environ["QUERY_STRING"]) query_list = [] for k, v in query_dict.items(): sanitized_list = list(map(escape, v)) query_list.append("<b>%s:</b> %s" % (k, str(sanitized_list))) page = HTML % "<br>\n".join(sorted(query_list)) if PY3: page = page.encode(ENCODING) status = "200 OK" headers = [ ("Content-type", "text/html"), ("Content-Length", str(len(page))) ] start_response(status, headers) return [page] #------------------------------------------------------------------------------ my_server = make_server(ADDR, PORT, gest) my_server.serve_forever()
Una richiesta di tipo POST avviene tramite l'invio di un form HTML.
# -*- coding: utf-8 -*- #------------------------------------------------------------------------------ from wsgiref.simple_server import make_server from cgi import parse_qs, escape import sys #------------------------------------------------------------------------------ PY3 = sys.version_info[0] > 2 ADDR, PORT = "localhost", 8888 # Indirizzo a cui risponde il server ENCODING = "utf-8" #------------------------------------------------------------------------------ HTML = ''' <html> <head> <meta http-equiv="content-type" content="text/html; charset=UTF-8"> </head> <body style="font-size:20px; margin:64px; background-color: rgb(200,200,200)"> <span style="color: red; font-weight:bold"> <u>VARIABILI POST</u> </span> <br><br> %s <br><br> <form action="http://localhost:8888" method="post"> <input type="text" name="n" value=""> Nome<br> <input type="text" name="c" value=""> Cognome<br> <input type="text" name="e" value=""> Età<br> <input type="text" name="h" value=""> Hobby<br> <input type="hidden" name="i" value="prova"> <input type="submit" value=" Invia "> </form> </body> </html> ''' #------------------------------------------------------------------------------ # Gestore della richiesta HTTP/WSGI #------------------------------------------------------------------------------ def gest(environ, start_response): try: request_body_size = int(environ["CONTENT_LENGTH"]) except (ValueError, KeyError): request_body_size = 0 request_body = environ["wsgi.input"].read(request_body_size) if PY3: request_body = request_body.decode(ENCODING) query_dict = parse_qs(request_body) query_list = [] for k, v in query_dict.items(): sanitized_list = list(map(escape, v)) query_list.append("<b>%s:</b> %s" % (k, str(sanitized_list))) page = HTML % "<br>\n".join(sorted(query_list)) if PY3: page = page.encode(ENCODING) status = "200 OK" headers = [ ("Content-type", "text/html"), ("Content-Length", str(len(page))) ] start_response(status, headers) return [page] #------------------------------------------------------------------------------ my_server = make_server(ADDR, PORT, gest) my_server.serve_forever()
Tramite la variabile PATH_INFO è possibile determinare la risorsa richiesta, ecco una tabella di URL e valori riportati da PATH_INFO:
http://localhost:8888 / http://localhost:8888/ / http://localhost:8888/pippo/zucca /pippo/zucca http://localhost:8888/pippo/zucca/ /pippo/zucca/ http://localhost:8888/pippo/zucca?p=0 /pippo/zucca http://localhost:8888/pippo/zucca/?p=0 /pippo/zucca/ http://localhost:8888/pippo/topogigio.html /pippo/topogigio.html http://localhost:8888/pippo/topogigio.html?p=2 /pippo/topogigio.html
# -*- coding: utf-8 -*- #------------------------------------------------------------------------------ from wsgiref.simple_server import make_server from cgi import parse_qs, escape import sys #------------------------------------------------------------------------------ PY3 = sys.version_info[0] > 2 ADDR, PORT = "localhost", 8888 # Indirizzo a cui risponde il server ENCODING = "utf-8" #------------------------------------------------------------------------------ HTML = ''' <html> <head> <meta http-equiv="content-type" content="text/html; charset=UTF-8"> </head> <body style="font-size:20px; margin:64px; background-color: rgb(200,200,200)"> <span style="color: red; font-weight:bold"> <u>RISORSA RICHIESTA</u> </span> <br><br> %s </body> </html> ''' #------------------------------------------------------------------------------ # Gestore della richiesta HTTP/WSGI #------------------------------------------------------------------------------ def gest(environ, start_response): page = HTML % environ["PATH_INFO"] if PY3: page = page.encode(ENCODING) status = "200 OK" headers = [ ("Content-type", "text/html"), ("Content-Length", str(len(page))) ] start_response(status, headers) return [page] #------------------------------------------------------------------------------ my_server = make_server(ADDR, PORT, gest) my_server.serve_forever()
WEB server elementare in grado di servire i file più comuni da disco partendo da una directory specificata in WWW_DIR (e sottodirectory).
# -*- coding: utf-8 -*- #------------------------------------------------------------------------------ from wsgiref.simple_server import make_server import sys import os #------------------------------------------------------------------------------ PY3 = sys.version_info[0] > 2 ADDR, PORT = "localhost", 8888 # Indirizzo a cui risponde il server ENCODING = "utf-8" WWW_DIR = "/home/username/Documenti/webtest" TYPE = { "css": "text/css", "gif": "image/gif", "jpg": "image/jpg", "jpeg": "image/jpg", "png": "image/png", "ico": "image/ico", "html": "text/html", "htm": "text/html" } #------------------------------------------------------------------------------ def type_detect(resource_name): parts = resource_name.split('.') if len(parts) > 1: extension = parts[1] return TYPE[extension] if extension in TYPE else "text/plain" else: return "text/plain" #------------------------------------------------------------------------------ # Gestore della richiesta HTTP/WSGI #------------------------------------------------------------------------------ def gest(environ, start_response): resource_path = os.path.join(WWW_DIR, environ["PATH_INFO"][1:]) resource_name = os.path.basename(resource_path) if resource_name: try: data = file(resource_path, "rb").read() content_type = type_detect(resource_name) except IOError: data = "Errore nel recupero della risorsa specificata!" content_type = "text/plain" if PY3: data = data.encode(ENCODING) else: data = "Nessuna risorsa specificata!" content_type = "text/plain" if PY3: data = data.encode(ENCODING) status = "200 OK" headers = [ ("Content-type", content_type), ("Content-Length", str(len(data))) ] start_response(status, headers) return [data] #------------------------------------------------------------------------------ my_server = make_server(ADDR, PORT, gest) my_server.serve_forever()
È possibile richiamare funzioni Python arbitrarie che utilizzano le variabili fornite dal browser. Nell'esempio seguente il WEB server controlla se viene richiesta una risorsa con estensione .py (ma potrebbe essere qualsiasi altra cosa) e in quel caso invece di servire i files da disco chiama una funzione che può effettuare qualsiasi operazione sia necessaria. La funzione deve ritornare i dati per il browser.
# -*- coding: utf-8 -*- #------------------------------------------------------------------------------ from wsgiref.simple_server import make_server from cgi import parse_qs, escape import sys import os #------------------------------------------------------------------------------ PY3 = sys.version_info[0] > 2 ADDR, PORT = "localhost", 8888 # Indirizzo a cui risponde il server ENCODING = "utf-8" WWW_DIR = "/home/username/Documenti/webtest" TYPE = { "css": "text/css", "gif": "image/gif", "jpg": "image/jpg", "jpeg": "image/jpg", "png": "image/png", "ico": "image/ico", "html": "text/html", "htm": "text/html" } #------------------------------------------------------------------------------ def type_detect(resource_name): parts = resource_name.split('.') if len(parts) > 1: extension = parts[1] return TYPE[extension] if extension in TYPE else "text/plain" else: return "text/plain" #------------------------------------------------------------------------------ # Recupero risorsa da disco #------------------------------------------------------------------------------ def get_resource(resource_path): resource_name = os.path.basename(resource_path) if resource_name: try: data = file(resource_path, "rb").read() content_type = type_detect(resource_name) except IOError: data = "Errore nel recupero della risorsa specificata!" content_type = "text/plain" if PY3: data = data.encode(ENCODING) else: data = "Nessuna risorsa specificata!" content_type = "text/plain" if PY3: data = data.encode(ENCODING) return data, content_type #------------------------------------------------------------------------------ # Recupera le variabili sia GET che POST #------------------------------------------------------------------------------ def get_variables(environ): request_method = environ["REQUEST_METHOD"] if request_method == "GET": query_dict = parse_qs(environ["QUERY_STRING"]) elif request_method == "POST": try: request_body_size = int(environ["CONTENT_LENGTH"]) except (ValueError, KeyError): request_body_size = 0 request_body = environ["wsgi.input"].read(request_body_size) if PY3: request_body = request_body.decode(ENCODING) query_dict = parse_qs(request_body) else: query_dict = {} for k, v in query_dict.items(): sanitized_list = list(map(escape, v)) query_dict[k] = sanitized_list return query_dict #------------------------------------------------------------------------------ # Funzione generica per svolgere un qualsiasi compito #------------------------------------------------------------------------------ def app(environ, resource_path): query_dict = get_variables(environ) data = str(query_dict) if PY3: data = data.encode(ENCODING) return data, "text/plain" #------------------------------------------------------------------------------ # Gestore della richiesta HTTP/WSGI #------------------------------------------------------------------------------ def gest(environ, start_response): resource_path = os.path.join(WWW_DIR, environ["PATH_INFO"][1:]) resource_name = os.path.basename(resource_path) if resource_name.endswith(".py"): data, content_type = app(environ, resource_path) else: data, content_type = get_resource(resource_path) status = "200 OK" headers = [ ("Content-type", content_type), ("Content-Length", str(len(data))) ] start_response(status, headers) return [data] #------------------------------------------------------------------------------ my_server = make_server(ADDR, PORT, gest) my_server.serve_forever()
Il programma seguente è un semplice gioco WEB testuale multiutente basato sul server WSGI. Lo stato del gioco è contenuto nelle istanze della classe Gamer (una per ogni giocatore), la funzione wsgi_handler processa le richieste del server (tutte di tipo GET), la logica è gestita tramite state pattern semplificato. La sincronizzazione tra il programma e il browser avviene tramite i campi hidden 'ph' e 'idg' del form HTML che identificano la fase in corso e il giocatore.
# -*- coding: utf8 -*- #------------------------------------------------------------------------------ # Indovina il numero - Esempio di gioco WEB multiutente # Codice compatibile Python2 e Python3 - By C.Fin 2013 #------------------------------------------------------------------------------ import wsgiref.simple_server import random import time import cgi #------------------------------------------------------------------------------ # Costanti e parti HTML non variabili #------------------------------------------------------------------------------ ADDR, PORT = "localhost", 8888 MAX_GAMERS = 20 PAGE_HEAD = (''' <!DOCTYPE html PUBLIC "-//w3c//dtd html 4.0 transitional//en"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> </head> <body style="margin:32px; font-size:20px; font-family:monospace; background-color:black; color:lime; font-weight:normal" onload="document.forms[0].nr.focus();"> <h3>Indovina il numero tra 1 e 1000</h3> ''') BUSY_PAGE = (''' <!DOCTYPE html PUBLIC "-//w3c//dtd html 4.0 transitional//en"> <html><head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> </head> <h3>Troppi utenti connessi, riprovare piu' tardi</h3> </body></html> ''') PAGE_FINAL = "</body></html>" #------------------------------------------------------------------------------ # Generazione parti HTML variabili #------------------------------------------------------------------------------ def input_form(gamer): return (''' <br><br> <table style="font-size:20px; font-family:monospace; font-weight:normal;" cellpadding="0" cellspacing="0"> <tr> <td valign="top">Immettere numero </td> <td valign="top"> <form action="" method="GET"> <input type="text" value="" size="15" name="nr"> <input type="submit" value="Tenta"> <input type="hidden" name="ph" value="%03d"> <input type="hidden" name="idg" value="%s"> </form> </td> </tr> </table> ''') % (gamer.phase, gamer.idg) def start_form(gamer): return (''' <br> <form action="" method="GET"> <input type="submit" value="Nuova partita"> <input type="hidden" name="idg" value="%s"> </form> ''') % gamer.idg def error_page(gamer): return ( PAGE_HEAD + "<h3>Errore di fase</h3>" + "Non usare mai ricarica o pagina avanti/indietro<br>" + start_form(gamer) + PAGE_FINAL ) #------------------------------------------------------------------------------ # State Pattern #------------------------------------------------------------------------------ class InitialStatus: def __call__(self, gamer, getvars): gamer.secret_nr = random.randint(1, 1000) gamer.phase = 1 gamer.time = time.time() gamer.attempts = [] gamer.page = PAGE_HEAD + input_form(gamer) + PAGE_FINAL gamer.status = GameStatus() return gamer.page #------------------------------------------------------------------------------ class GameStatus: def __call__(self, gamer, getvars): gamer.time = time.time() # controlla se fase esistente e corretta try: phase = int(getvars["ph"][0]) if phase != gamer.phase: raise ValueError except (ValueError, KeyError): gamer.page = error_page(gamer) gamer.status = InitialStatus() return gamer.page # controlla se input numerico esistente e valido try: number = int(getvars["nr"][0]) except (ValueError, KeyError): return gamer.page # controlla se indovinato if number == gamer.secret_nr: gamer.attempts.append(str(number)) gamer.page = ( PAGE_HEAD + "<br>".join(gamer.attempts) + ("<h3>Esatto in %d " % gamer.phase) + ("tentativo" if gamer.phase == 1 else "tentativi") + "</h3>" + start_form(gamer) + PAGE_FINAL ) gamer.status = InitialStatus() return gamer.page # attesa tentativo successivo gamer.phase += 1 gamer.attempts.append( ("%d " % number) + ("Troppo alto" if number > gamer.secret_nr else "Troppo basso") ) gamer.page = ( PAGE_HEAD + "<br>".join(gamer.attempts) + input_form(gamer) + PAGE_FINAL ) return gamer.page #------------------------------------------------------------------------------ # GAMER - Un'istanza per ogni giocatore, attributi principali: # idg id del giocatore # phase fase di gioco attesa nella richiesta HTTP # secret_nr numero da indovinare # time ora ultima transazione # attempts lista tentativi effettuati # page pagina HTML da restituire al browser # status metodo che gestisce la logica del gioco #------------------------------------------------------------------------------ class Gamer: def __init__(self, idg): self.idg = idg self.status = InitialStatus() #------------------------------------------------------------------------------ # Cancella giocatori senza transazioni da 5 minuti #------------------------------------------------------------------------------ def delete_old_gamers(gamers): gamers[:] = [gamer for gamer in gamers if time.time() - gamer.time < 300] #------------------------------------------------------------------------------ # Determina giocatore da servire o crea nuovo giocatore #------------------------------------------------------------------------------ def get_gamer(gamers, getvars): idg_list = [gamer.idg for gamer in gamers] if "idg" in getvars: idg = getvars["idg"][0] if idg in idg_list: return gamers[idg_list.index(idg)] if len(gamers) == MAX_GAMERS: return None # Crea nuovo idg non gia' esistente new_idg = "%06d" % random.randrange(1000000) while new_idg in idg_list: new_idg = "%06d" % random.randrange(1000000) # Crea nuovo giocatore new_gamer = Gamer(new_idg) gamers.append(new_gamer) print("Creato gamer %s, utenti connessi %d " % (new_idg, len(gamers))) return new_gamer #------------------------------------------------------------------------------ # Gestione singola request HTTP #------------------------------------------------------------------------------ def wsgi_handler(environ, start_response, gamers): # Elimina istanze giocatori non piu' attivi delete_old_gamers(gamers) if environ["PATH_INFO"] == "/favicon.ico": # Ignora la richiesta favicon dal browser page = "" else: # Acquisisce variabili query string getvars = cgi.parse_qs(environ["QUERY_STRING"]) for k, v in getvars.items(): getvars[k] = [cgi.escape(x) for x in v] # Riconosce giocatore e gestisce tentativo gamer = get_gamer(gamers, getvars) if gamer: page = gamer.status(gamer, getvars) else: page = BUSY_PAGE # Invia risposta al browser page = page.encode("utf-8") headers = [ ("Content-type", "text/html"), ("Content-length", str(len(page))) ] start_response("200 OK", headers) return [page] #------------------------------------------------------------------------------ # MAIN #------------------------------------------------------------------------------ def main(): gamers = [] server = wsgiref.simple_server.make_server( ADDR, PORT, lambda env, rsp: wsgi_handler(env, rsp, gamers) ) server.serve_forever() #------------------------------------------------------------------------------ main()
Il programma seguente è un server che, su richiesta ajax del browser, genera delle immagini "al volo" restituendole in forma base64 encode. Le immagini sono realizzate tramite la libreria PIL/Pillow per Python2. Grazie all'oggetto buffer BytesIO, che si comporta come un file binario simulato in memoria, le immagini non vengono salvate su disco ma gestite interamente in memoria. Il server provvede anche a inviare al browser la pagina HTML iniziale, e un file Javascript con le funzioni per generare richieste ajax in modo semplificato.
<!DOCTYPE html PUBLIC "-//w3c//dtd html 4.0 transitional//en"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <script type="text/javascript" src="utility.js"></script> <script type="text/javascript"> function move() { ajaxGET("move", handler1); } function handler1(response) { document.getElementById("immagine").innerHTML = response; setTimeout(move, 1000); } </script> </head> <body onload="move()" style="background-color:white;"> <div style="text-align:center; margin:64px;"> <div id="immagine"></div> </div> </body"> </html>
function test_status(request, handler) { if (request.readyState == 4 && request.status == 200) handler(request.responseText); } function CreateXMLHttpRequest() { var newRequest = null; try{ newRequest = new XMLHttpRequest();} catch(e) { try{ newRequest = new ActiveXObject("Msxml2.XMLHTTP");} catch(e){ newRequest = new ActiveXObject("Microsoft.XMLHTTP");} } return newRequest; } function ajaxGET(url, handler) { var newRequest = CreateXMLHttpRequest(); newHandler = function(){ test_status(newRequest, handler); }; newRequest.onreadystatechange = newHandler; url += (url.indexOf("?") == -1) ? "?" : "&"; url += "rand=" + escape(Math.random()); newRequest.open("GET", url); newRequest.send(null); }
#------------------------------------------------------------------------------ # Programma server in Python2.x # Demo aggiornamento immagine tramite richieste ajax #------------------------------------------------------------------------------ import wsgiref.simple_server import base64 import random import io import Image # PIL / Pillow import ImageDraw # PIL / Pillow from math import pi, sin, cos #------------------------------------------------------------------------------ # Creazione immagine #------------------------------------------------------------------------------ class Animation: def __init__(self): self.image = Image.new("RGB", (64, 44), "green") self.drawer = ImageDraw.Draw(self.image) self.buf = io.BytesIO() self.n = 4 def move(self): self.drawer.rectangle((0, 0, 64, 44), outline="green", fill="green") d = 2 * pi / 600 p = 20 for i in range(0, 601): t = d * i r = p * sin(self.n * t) x = r * cos(t) + 32 y = r * sin(t) + 22 self.image.putpixel((int(x), int(y)), 0xFFFF00) self.n = self.n + 1 if self.n < 1000 else 4 self.buf.seek(0) self.image.save(self.buf, format="png") self.buf.seek(0) resp = '%s src="data:image/png;base64,%s">' return resp % ( '<img style="box-shadow:4px 4px 4px #789; border:1px solid white"', base64.b64encode(self.buf.read()) ) #------------------------------------------------------------------------------ # Gestione semplificata singola request HTTP #------------------------------------------------------------------------------ def wsgi_handler(environ, start_response): resource = environ["PATH_INFO"] if resource == "/move": resp = animat.move() elif resource == "/utility.js": resp = open("utility.js", "r").read() else: resp = open("page.html", "r").read() headers = [ ("Content-type", "text/html"), ("Content-length", str(len(resp))) ] start_response("200 OK", headers) return [resp] #------------------------------------------------------------------------------ # MAIN #------------------------------------------------------------------------------ animat = Animation() server = wsgiref.simple_server.make_server("localhost", 8888, wsgi_handler) server.serve_forever()