adding again all

This commit is contained in:
Artur Ventura 2011-07-01 22:28:15 +01:00
parent ceb7affda4
commit 1e5fd9944a
28 changed files with 8717 additions and 0 deletions

1
README Normal file
View File

@ -0,0 +1 @@
I don't know yet if this is even possible. but lets say yes.

BIN
Test.class Normal file

Binary file not shown.

39
app.py Normal file
View File

@ -0,0 +1,39 @@
#! /usr/bin/python
# -*- Mode: Python -*-
# -*- coding: UTF-8 -*-
# Copyright (C) 2009 by Artur Ventura
#
# File: app.py
# Time-stamp: Sun Aug 9 16:30:18 2009
#
# Author: Artur Ventura
#
import web
import commands
urls = (
'/(.*)', 'index',
)
app = web.application(urls, globals())
class index:
def GET(self,filename):
if filename.endswith("favicon.ico"):
web.webapi.notfound()
return ""
if filename == "":
return file("index.html").read()
try:
return file(filename).read()
except:
web.webapi.notfound()
return ""
if __name__ == "__main__":
app.run()

90
constantPool.js Normal file
View File

@ -0,0 +1,90 @@
var CONSTANT_Class = 7;
var CONSTANT_Fieldref = 9;
var CONSTANT_Methodref = 10;
var CONSTANT_InterfaceMethodref = 11;
var CONSTANT_String = 8;
var CONSTANT_Integer = 3;
var CONSTANT_Float = 4;
var CONSTANT_Long = 5;
var CONSTANT_Double = 6;
var CONSTANT_NameAndType = 12;
var CONSTANT_Utf8 = 1;
constUtf8 = function(){
this.str = null;
this.read = ( dStream ) {
StringBuffer strBuf;
int len, charCnt;
byte one_byte;
char one_char;
one_char = '\u0000';
len = readU2( dStream );
strBuf = new StringBuffer();
charCnt = 0;
while (charCnt < len) {
one_byte = (byte)readU1( dStream );
charCnt++;
if ((one_byte >> 7) == 1) {
short tmp;
// its a multi-byte character
tmp = (short)(one_byte & 0x3f); // Bits 5..0 (six bits)
// read the next byte
one_byte = (byte)readU1( dStream );
charCnt++;
tmp = (short)(tmp | ((one_byte & 0x3f) << 6));
if ((one_byte >> 6) == 0x2) {
// We have 12 bits so far, get bits 15..12
one_byte = (byte)readU1( dStream );
charCnt++;
one_byte = (byte)(one_byte & 0xf);
tmp = (short)(tmp | (one_byte << 12));
}
one_char = (char)tmp;
}
else {
one_char = (char)one_byte;
}
strBuf.append(one_char);
} // while
this.str = strBuf.toString();
} // read
};
allocConstEntry = function(tag){
var obj = null;
switch ( tag ) {
case CONSTANT_Utf8:
obj = new constUtf8();
break;
case CONSTANT_Integer:
obj = new constInt();
break;
case CONSTANT_Float:
obj = new constFloat();
break;
case CONSTANT_Long:
obj = new constLong();
break;
case CONSTANT_Double:
obj = new constDouble();
break;
case CONSTANT_Class:
case CONSTANT_String:
obj = new constClass_or_String();
break;
case CONSTANT_Fieldref:
case CONSTANT_Methodref:
case CONSTANT_InterfaceMethodref:
obj = new constRef();
break;
case CONSTANT_NameAndType:
obj = new constName_and_Type_info();
break;
default:
System.out.println("allocConstEntry: bad tag value = " + tag);
break;
} // switch
}

33
index.html Normal file
View File

@ -0,0 +1,33 @@
<html>
<head>
<title>JS JVM</title>
<script src="preload.js" type="text/javascript"></script>
<script src="main.js" type="text/javascript"></script>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<style>
body{
background:#DDE1ED;
font-family:Arial;
font-size:12px;
}
#background{
background:#DDE1ED;
}
#stick textarea{
margin-top:20px;
height:180px;
width:200px;
background:#FFCC00;
}
.ui-dialog-title{
padding-left:40px;
}
</style>
</head>
<body onload="main()">
<h1>JS JVM</h1>
Open JavaScript Debug Window to read the debug;
</body>
</html>

58
linearDatastream.js Normal file
View File

@ -0,0 +1,58 @@
DataStream = function(data){
this.i = 0;
this.getU = function(size){
switch(size){
case 1:
this.constantPoolCount = new DataView(data,this.i,1).getUint8(0);
this.i += 1;
break;
case 2:
this.constantPoolCount = new DataView(data,this.i,2).getUint16(0);
this.i += 2;
break;
case 4:
this.constantPoolCount = new DataView(data,this.i,4).getUint32(0);
this.i += 4;
break;
case 8:
this.constantPoolCount = new DataView(data,this.i,8).getUint64(0);
this.i += 8;
break;
}
};
this.get = function(size){
switch(size){
case 1:
this.constantPoolCount = new DataView(data,this.i,1).getInt8(0);
this.i += 1;
break;
case 2:
this.constantPoolCount = new DataView(data,this.i,2).getInt16(0);
this.i += 2;
break;
case 4:
this.constantPoolCount = new DataView(data,this.i,4).getInt32(0);
this.i += 4;
break;
case 8:
this.constantPoolCount = new DataView(data,this.i,8).getInt64(0);
this.i += 8;
break;
}
}
this.getUint8 = function () { return this.getU(1) }
this.getU1 = this.getUint8;
this.getUint16 = function () { return this.getU(2) }
this.getU2 = this.getUint16;
this.getUint32 = function () { return this.getU(4) }
this.getU4 = this.getUint32;
this.getUint64 = function () { return this.getU(8) }
this.getU8 = this.getUint64;
this.getInt8 = function () { return this.get(1) }
this.getInt16 = function () { return this.get(2) }
this.getInt32 = function () { return this.get(4) }
this.getInt64 = function () { return this.get(8) }
}

106
main.js Normal file
View File

@ -0,0 +1,106 @@
include("linearDataStream.js");
include("constantPool.js");
function slurpFile (filename, fa) {
var xmlHttpRequest, response, result ;
// ie support if (typeof ActiveXObject == "function") return this.load_binary_ie9(filename, fa);
xmlHttpRequest = new XMLHttpRequest();
xmlHttpRequest.open('GET', filename, false);
if ('mozResponseType' in xmlHttpRequest) {
xmlHttpRequest.mozResponseType = 'arraybuffer';
} else if ('responseType' in xmlHttpRequest) {
xmlHttpRequest.responseType = 'arraybuffer';
} else {
xmlHttpRequest.overrideMimeType('text/plain; charset=x-user-defined');
}
xmlHttpRequest.send(null);
if (xmlHttpRequest.status != 200 && xmlHttpRequest.status != 0) {
throw "Error while loading " + filename;
}
bf = true;
if ('mozResponse' in xmlHttpRequest) {
response = xmlHttpRequest.mozResponse;
} else if (xmlHttpRequest.mozResponseArrayBuffer) {
response = xmlHttpRequest.mozResponseArrayBuffer;
} else if ('responseType' in xmlHttpRequest) {
response = xmlHttpRequest.response;
} else {
response = xmlHttpRequest.responseText;
bf = false;
}
if (bf) {
result = [response.byteLength, response];
} else {
throw "No typed arrays";
}
return result;
};
log = function (msg){
if (console){
console.log(msg);
}
}
ConstantPoolStruct = function (tag,info){
log("0x" + tag.toString(16))
switch(tag){
case 7:
log("Class: "+ info.toString(16));
break;
case 9:
log("FieldRef: "+ info.toString(16));
break;
case 10:
log("MethodRef: "+ info.toString(16));
break;
case 11:
log("InterfaceMethodRef: "+ info.toString(16));
break;
case 8:
log("String: " + info.toString(16));
break;
case 3:
log("Integer: " + info.toString(16));
break;
case 4:
log("Float: " + info.toString(16));
break;
case 5:
log("Long: " + info.toString(16));
break;
case 6:
log("Long: " + info.toString(16));
break;
case 12:
log("NameAndType: " + info.toString(16));
break;
case 1:
log("Utf8: " + info.toString(16));
break;
default:
throw "Unkown tag number 0x" + tag.toString(16);
}
this.tag = tag;
this.info = info;
};
ClassDefinition = function (file){
var dataStream = new DataStream(slurpFile(file)[1]);
this.magic = dataStream.getU4();
this.magic == 0xCAFEBABE || alert("Invalid Class Magic");
this.minorVersion = dataStream.getU2();
this.majorVersion = dataStream.getU2();
this.constantPoolCount = dataStream.getU2();
this.constantPool = [];
for(var i = 1; i <= this.constantPoolCount; i++){
var tag = dataStream.getU1();
var alloc = allocConstEntry(tag);
alloc.read(dataStream);
var info = new DataView(x, 10 + (i-1) * (1 + 1) + 1,1).getUint8(0);
this.constantPool[(i-1)] = new ConstantPoolStruct(tag,info);
}
}
function main (args){
var classDef = new ClassDefinition("Test.class");
}

6
preload.js Normal file
View File

@ -0,0 +1,6 @@
function include(filename)
{
document.write('<script type="text/javascript" src="' + filename +
'"><' + '/script>');
}

34
web/__init__.py Normal file
View File

@ -0,0 +1,34 @@
#!/usr/bin/env python
"""web.py: makes web apps (http://webpy.org)"""
from __future__ import generators
__version__ = "0.31"
__author__ = [
"Aaron Swartz <me@aaronsw.com>",
"Anand Chitipothu <anandology@gmail.com>"
]
__license__ = "public domain"
__contributors__ = "see http://webpy.org/changes"
import utils, db, net, wsgi, http, webapi, httpserver, debugerror
import template, form
import session
from utils import *
from db import *
from net import *
from wsgi import *
from http import *
from webapi import *
from httpserver import *
from debugerror import *
from application import *
from browser import *
import test
try:
import webopenid as openid
except ImportError:
pass # requires openid module

624
web/application.py Normal file
View File

@ -0,0 +1,624 @@
#!/usr/bin/python
"""
Web application
(from web.py)
"""
import webapi as web
import webapi, wsgi, utils
import debugerror
from utils import lstrips, safeunicode
import sys
import urllib
import traceback
import itertools
import os
import re
import types
try:
import wsgiref.handlers
except ImportError:
pass # don't break people with old Pythons
__all__ = [
"application", "auto_application",
"subdir_application", "subdomain_application",
"loadhook", "unloadhook",
"autodelegate"
]
class application:
"""
Application to delegate requests based on path.
>>> urls = ("/hello", "hello")
>>> app = application(urls, globals())
>>> class hello:
... def GET(self): return "hello"
>>>
>>> app.request("/hello").data
'hello'
"""
def __init__(self, mapping=(), fvars={}, autoreload=None):
if autoreload is None:
autoreload = web.config.get('debug', False)
self.mapping = mapping
self.fvars = fvars
self.processors = []
if autoreload:
def main_module_name():
mod = sys.modules['__main__']
file = getattr(mod, '__file__', None) # make sure this works even from python interpreter
return file and os.path.splitext(os.path.basename(file))[0]
def modname(fvars):
"""find name of the module name from fvars."""
file, name = fvars.get('__file__'), fvars.get('__name__')
if file is None or name is None:
return None
if name == '__main__':
# Since the __main__ module can't be reloaded, the module has
# to be imported using its file name.
name = main_module_name()
return name
mapping_name = utils.dictfind(fvars, mapping)
module_name = modname(fvars)
def reload_mapping():
"""loadhook to reload mapping and fvars."""
mod = __import__(module_name)
mapping = getattr(mod, mapping_name, None)
if mapping:
self.fvars = mod.__dict__
self.mapping = mapping
self.add_processor(loadhook(Reloader()))
if mapping_name and module_name:
self.add_processor(loadhook(reload_mapping))
# load __main__ module usings its filename, so that it can be reloaded.
if main_module_name() and '__main__' in sys.argv:
try:
__import__(main_module_name())
except ImportError:
pass
def add_mapping(self, pattern, classname):
self.mapping += (pattern, classname)
def add_processor(self, processor):
"""
Adds a processor to the application.
>>> urls = ("/(.*)", "echo")
>>> app = application(urls, globals())
>>> class echo:
... def GET(self, name): return name
...
>>>
>>> def hello(handler): return "hello, " + handler()
>>> app.add_processor(hello)
>>> app.request("/web.py").data
'hello, web.py'
"""
self.processors.append(processor)
def request(self, localpart='/', method='GET', data=None,
host="0.0.0.0:8080", headers=None, https=False, **kw):
"""Makes request to this application for the specified path and method.
Response will be a storage object with data, status and headers.
>>> urls = ("/hello", "hello")
>>> app = application(urls, globals())
>>> class hello:
... def GET(self):
... web.header('Content-Type', 'text/plain')
... return "hello"
...
>>> response = app.request("/hello")
>>> response.data
'hello'
>>> response.status
'200 OK'
>>> response.headers['Content-Type']
'text/plain'
To use https, use https=True.
>>> urls = ("/redirect", "redirect")
>>> app = application(urls, globals())
>>> class redirect:
... def GET(self): raise web.seeother("/foo")
...
>>> response = app.request("/redirect")
>>> response.headers['Location']
'http://0.0.0.0:8080/foo'
>>> response = app.request("/redirect", https=True)
>>> response.headers['Location']
'https://0.0.0.0:8080/foo'
The headers argument specifies HTTP headers as a mapping object
such as a dict.
>>> urls = ('/ua', 'uaprinter')
>>> class uaprinter:
... def GET(self):
... return 'your user-agent is ' + web.ctx.env['HTTP_USER_AGENT']
...
>>> app = application(urls, globals())
>>> app.request('/ua', headers = {
... 'User-Agent': 'a small jumping bean/1.0 (compatible)'
... }).data
'your user-agent is a small jumping bean/1.0 (compatible)'
"""
path, maybe_query = urllib.splitquery(localpart)
query = maybe_query or ""
if 'env' in kw:
env = kw['env']
else:
env = {}
env = dict(env, HTTP_HOST=host, REQUEST_METHOD=method, PATH_INFO=path, QUERY_STRING=query, HTTPS=str(https))
headers = headers or {}
for k, v in headers.items():
env['HTTP_' + k.upper().replace('-', '_')] = v
if 'HTTP_CONTENT_LENGTH' in env:
env['CONTENT_LENGTH'] = env.pop('HTTP_CONTENT_LENGTH')
if 'HTTP_CONTENT_TYPE' in env:
env['CONTENT_TYPE'] = env.pop('HTTP_CONTENT_TYPE')
if data:
import StringIO
if isinstance(data, dict):
q = urllib.urlencode(data)
else:
q = data
env['wsgi.input'] = StringIO.StringIO(q)
if not env.get('CONTENT_TYPE', '').lower().startswith('multipart/') and 'CONTENT_LENGTH' not in env:
env['CONTENT_LENGTH'] = len(q)
response = web.storage()
def start_response(status, headers):
response.status = status
response.headers = dict(headers)
response.header_items = headers
response.data = "".join(self.wsgifunc(cleanup_threadlocal=False)(env, start_response))
return response
def browser(self):
import browser
return browser.AppBrowser(self)
def handle(self):
fn, args = self._match(self.mapping, web.ctx.path)
return self._delegate(fn, self.fvars, args)
def handle_with_processors(self):
def process(processors):
try:
web.ctx.app_stack.append(self)
if processors:
p, processors = processors[0], processors[1:]
return p(lambda: process(processors))
else:
return self.handle()
except web.HTTPError:
raise
except:
print >> web.debug, traceback.format_exc()
raise self.internalerror()
try:
# processors must be applied in the resvere order. (??)
return process(self.processors)
finally:
web.ctx.app_stack = web.ctx.app_stack[:-1]
def wsgifunc(self, *middleware, **kw):
"""Returns a WSGI-compatible function for this application."""
def peep(iterator):
"""Peeps into an iterator by doing an iteration
and returns an equivalent iterator.
"""
# wsgi requires the headers first
# so we need to do an iteration
# and save the result for later
try:
firstchunk = iterator.next()
except StopIteration:
firstchunk = ''
return itertools.chain([firstchunk], iterator)
def is_generator(x): return x and hasattr(x, 'next')
def wsgi(env, start_resp):
self.load(env)
try:
# allow uppercase methods only
if web.ctx.method.upper() != web.ctx.method:
raise web.nomethod()
result = self.handle_with_processors()
except web.HTTPError, e:
result = e.data
if is_generator(result):
result = peep(result)
else:
result = [utils.utf8(result)]
status, headers = web.ctx.status, web.ctx.headers
start_resp(status, headers)
#@@@
# Since the CherryPy Webserver uses thread pool, the thread-local state is never cleared.
# This interferes with the other requests.
# clearing the thread-local storage to avoid that.
# see utils.ThreadedDict for details
import threading
t = threading.currentThread()
if kw.get('cleanup_threadlocal', True) and hasattr(t, '_d'):
del t._d
return result
for m in middleware:
wsgi = m(wsgi)
return wsgi
def run(self, *middleware):
"""
Starts handling requests. If called in a CGI or FastCGI context, it will follow
that protocol. If called from the command line, it will start an HTTP
server on the port named in the first command line argument, or, if there
is no argument, on port 8080.
`middleware` is a list of WSGI middleware which is applied to the resulting WSGI
function.
"""
return wsgi.runwsgi(self.wsgifunc(*middleware))
def cgirun(self, *middleware):
"""
Return a CGI handler. This is mostly useful with Google App Engine.
There you can just do:
main = app.cgirun()
"""
wsgiapp = self.wsgifunc(*middleware)
try:
from google.appengine.ext.webapp.util import run_wsgi_app
return run_wsgi_app(wsgiapp)
except ImportError:
# we're not running from within Google App Engine
return wsgiref.handlers.CGIHandler().run(wsgiapp)
def load(self, env):
"""Initializes ctx using env."""
ctx = web.ctx
ctx.clear()
ctx.status = '200 OK'
ctx.headers = []
ctx.output = ''
ctx.environ = ctx.env = env
ctx.host = env.get('HTTP_HOST')
if env.get('wsgi.url_scheme') in ['http', 'https']:
ctx.protocol = env['wsgi.url_scheme']
elif env.get('HTTPS', '').lower() in ['on', 'true', '1']:
ctx.protocol = 'https'
else:
ctx.protocol = 'http'
ctx.homedomain = ctx.protocol + '://' + env.get('HTTP_HOST', '[unknown]')
ctx.homepath = os.environ.get('REAL_SCRIPT_NAME', env.get('SCRIPT_NAME', ''))
ctx.home = ctx.homedomain + ctx.homepath
#@@ home is changed when the request is handled to a sub-application.
#@@ but the real home is required for doing absolute redirects.
ctx.realhome = ctx.home
ctx.ip = env.get('REMOTE_ADDR')
ctx.method = env.get('REQUEST_METHOD')
ctx.path = env.get('PATH_INFO')
# http://trac.lighttpd.net/trac/ticket/406 requires:
if env.get('SERVER_SOFTWARE', '').startswith('lighttpd/'):
ctx.path = lstrips(env.get('REQUEST_URI').split('?')[0], ctx.homepath)
if env.get('QUERY_STRING'):
ctx.query = '?' + env.get('QUERY_STRING', '')
else:
ctx.query = ''
ctx.fullpath = ctx.path + ctx.query
for k, v in ctx.iteritems():
if isinstance(v, str):
ctx[k] = safeunicode(v)
# status must always be str
ctx.status = '200 OK'
ctx.app_stack = []
def _delegate(self, f, fvars, args=[]):
def handle_class(cls):
meth = web.ctx.method
if meth == 'HEAD' and not hasattr(cls, meth):
meth = 'GET'
if not hasattr(cls, meth):
raise web.nomethod(cls)
tocall = getattr(cls(), meth)
return tocall(*args)
def is_class(o): return isinstance(o, (types.ClassType, type))
if f is None:
raise web.notfound()
elif isinstance(f, application):
return f.handle_with_processors()
elif is_class(f):
return handle_class(f)
elif isinstance(f, basestring):
if f.startswith('redirect '):
url = f.split(' ', 1)[1]
if web.ctx.method == "GET":
x = web.ctx.env.get('QUERY_STRING', '')
if x:
url += '?' + x
raise web.redirect(url)
elif '.' in f:
x = f.split('.')
mod, cls = '.'.join(x[:-1]), x[-1]
mod = __import__(mod, globals(), locals(), [""])
cls = getattr(mod, cls)
else:
cls = fvars[f]
return handle_class(cls)
elif hasattr(f, '__call__'):
return f()
else:
return web.notfound()
def _match(self, mapping, value):
for pat, what in utils.group(mapping, 2):
if isinstance(what, application):
if value.startswith(pat):
f = lambda: self._delegate_sub_application(pat, what)
return f, None
else:
continue
elif isinstance(what, basestring):
what, result = utils.re_subm('^' + pat + '$', what, value)
else:
result = utils.re_compile('^' + pat + '$').match(value)
if result: # it's a match
return what, [x and urllib.unquote(x) for x in result.groups()]
return None, None
def _delegate_sub_application(self, dir, app):
"""Deletes request to sub application `app` rooted at the directory `dir`.
The home, homepath, path and fullpath values in web.ctx are updated to mimic request
to the subapp and are restored after it is handled.
@@Any issues with when used with yield?
"""
try:
oldctx = web.storage(web.ctx)
web.ctx.home += dir
web.ctx.homepath += dir
web.ctx.path = web.ctx.path[len(dir):]
web.ctx.fullpath = web.ctx.fullpath[len(dir):]
return app.handle_with_processors()
finally:
web.ctx.home = oldctx.home
web.ctx.homepath = oldctx.homepath
web.ctx.path = oldctx.path
web.ctx.fullpath = oldctx.fullpath
def get_parent_app(self):
if self in web.ctx.app_stack:
index = web.ctx.app_stack.index(self)
if index > 0:
return web.ctx.app_stack[index-1]
def notfound(self):
"""Returns HTTPError with '404 not found' message"""
parent = self.get_parent_app()
if parent:
return parent.notfound()
else:
return web._NotFound()
def internalerror(self):
"""Returns HTTPError with '500 internal error' message"""
parent = self.get_parent_app()
if parent:
return parent.internalerror()
elif web.config.get('debug'):
import debugerror
return debugerror.debugerror()
else:
return web._InternalError()
class auto_application(application):
"""Application similar to `application` but urls are constructed
automatiacally using metaclass.
>>> app = auto_application()
>>> class hello(app.page):
... def GET(self): return "hello, world"
...
>>> class foo(app.page):
... path = '/foo/.*'
... def GET(self): return "foo"
>>> app.request("/hello").data
'hello, world'
>>> app.request('/foo/bar').data
'foo'
"""
def __init__(self):
application.__init__(self)
class metapage(type):
def __init__(klass, name, bases, attrs):
type.__init__(klass, name, bases, attrs)
path = attrs.get('path', '/' + name)
# path can be specified as None to ignore that class
# typically required to create a abstract base class.
if path is not None:
self.add_mapping(path, klass)
class page:
path = None
__metaclass__ = metapage
self.page = page
# The application class already has the required functionality of subdir_application
subdir_application = application
class subdomain_application(application):
"""
Application to delegate requests based on the host.
>>> urls = ("/hello", "hello")
>>> app = application(urls, globals())
>>> class hello:
... def GET(self): return "hello"
>>>
>>> mapping = ("hello.example.com", app)
>>> app2 = subdomain_application(mapping)
>>> app2.request("/hello", host="hello.example.com").data
'hello'
>>> response = app2.request("/hello", host="something.example.com")
>>> response.status
'404 Not Found'
>>> response.data
'not found'
"""
def handle(self):
host = web.ctx.host.split(':')[0] #strip port
fn, args = self._match(self.mapping, host)
return self._delegate(fn, self.fvars, args)
def _match(self, mapping, value):
for pat, what in utils.group(mapping, 2):
if isinstance(what, basestring):
what, result = utils.re_subm('^' + pat + '$', what, value)
else:
result = utils.re_compile('^' + pat + '$').match(value)
if result: # it's a match
return what, [x and urllib.unquote(x) for x in result.groups()]
return None, None
def loadhook(h):
"""
Converts a load hook into an application processor.
>>> app = auto_application()
>>> def f(): "something done before handling request"
...
>>> app.add_processor(loadhook(f))
"""
def processor(handler):
h()
return handler()
return processor
def unloadhook(h):
"""
Converts an unload hook into an application processor.
>>> app = auto_application()
>>> def f(): "something done after handling request"
...
>>> app.add_processor(unloadhook(f))
"""
def processor(handler):
try:
return handler()
finally:
h()
return processor
def autodelegate(prefix=''):
"""
Returns a method that takes one argument and calls the method named prefix+arg,
calling `notfound()` if there isn't one. Example:
urls = ('/prefs/(.*)', 'prefs')
class prefs:
GET = autodelegate('GET_')
def GET_password(self): pass
def GET_privacy(self): pass
`GET_password` would get called for `/prefs/password` while `GET_privacy` for
`GET_privacy` gets called for `/prefs/privacy`.
If a user visits `/prefs/password/change` then `GET_password(self, '/change')`
is called.
"""
def internal(self, arg):
if '/' in arg:
first, rest = arg.split('/', 1)
func = prefix + first
args = ['/' + rest]
else:
func = prefix + arg
args = []
if hasattr(self, func):
try:
return getattr(self, func)(*args)
except TypeError:
return web.notfound()
else:
return web.notfound()
return internal
class Reloader:
"""Checks to see if any loaded modules have changed on disk and,
if so, reloads them.
"""
def __init__(self):
self.mtimes = {}
def __call__(self):
for mod in sys.modules.values():
self.check(mod)
def check(self, mod):
try:
mtime = os.stat(mod.__file__).st_mtime
except (AttributeError, OSError, IOError):
return
if mod.__file__.endswith('.pyc') and os.path.exists(mod.__file__[:-1]):
mtime = max(os.stat(mod.__file__[:-1]).st_mtime, mtime)
if mod not in self.mtimes:
self.mtimes[mod] = mtime
elif self.mtimes[mod] < mtime:
try:
reload(mod)
self.mtimes[mod] = mtime
except ImportError:
pass
if __name__ == "__main__":
import doctest
doctest.testmod()

232
web/browser.py Normal file
View File

@ -0,0 +1,232 @@
"""Browser to test web applications.
(from web.py)
"""
from utils import re_compile
from net import htmlunquote
import httplib, urllib, urllib2
import cookielib
import copy
from StringIO import StringIO
DEBUG = False
__all__ = [
"BrowserError",
"Browser", "AppBrowser",
"AppHandler"
]
class BrowserError(Exception):
pass
class Browser:
def __init__(self):
self.cookiejar = cookielib.CookieJar()
self._cookie_processor = urllib2.HTTPCookieProcessor(self.cookiejar)
self.form = None
self.url = "http://0.0.0.0:8080/"
self.path = "/"
self.status = None
self.data = None
self._response = None
self._forms = None
def reset(self):
"""Clears all cookies and history."""
self.cookiejar.clear()
def build_opener(self):
"""Builds the opener using urllib2.build_opener.
Subclasses can override this function to prodive custom openers.
"""
return urllib2.build_opener()
def do_request(self, req):
if DEBUG:
print 'requesting', req.get_method(), req.get_full_url()
opener = self.build_opener()
opener.add_handler(self._cookie_processor)
try:
self._response = opener.open(req)
except urllib2.HTTPError, e:
self._response = e
self.url = self._response.geturl()
self.path = urllib2.Request(self.url).get_selector()
self.data = self._response.read()
self.status = self._response.code
self._forms = None
self.form = None
return self.get_response()
def open(self, url, data=None, headers={}):
"""Opens the specified url."""
url = urllib.basejoin(self.url, url)
req = urllib2.Request(url, data, headers)
return self.do_request(req)
def show(self):
"""Opens the current page in real web browser."""
f = open('page.html', 'w')
f.write(self.data)
f.close()
import webbrowser, os
url = 'file://' + os.path.abspath('page.html')
webbrowser.open(url)
def get_response(self):
"""Returns a copy of the current response."""
return urllib.addinfourl(StringIO(self.data), self._response.info(), self._response.geturl())
def get_soup(self):
"""Returns beautiful soup of the current document."""
import BeautifulSoup
return BeautifulSoup.BeautifulSoup(self.data)
def get_text(self, e=None):
"""Returns content of e or the current document as plain text."""
e = e or self.get_soup()
return ''.join([htmlunquote(c) for c in e.recursiveChildGenerator() if isinstance(c, unicode)])
def _get_links(self):
soup = self.get_soup()
return [a for a in soup.findAll(name='a')]
def get_links(self, text=None, text_regex=None, url=None, url_regex=None, predicate=None):
"""Returns all links in the document."""
return self._filter_links(self._get_links(),
text=text, text_regex=text_regex, url=url, url_regex=url_regex, predicate=predicate)
def follow_link(self, link=None, text=None, text_regex=None, url=None, url_regex=None, predicate=None):
if link is None:
links = self._filter_links(self.get_links(),
text=text, text_regex=text_regex, url=url, url_regex=url_regex, predicate=predicate)
link = links and links[0]
if link:
return self.open(link['href'])
else:
raise BrowserError("No link found")
def find_link(self, text=None, text_regex=None, url=None, url_regex=None, predicate=None):
links = self._filter_links(self.get_links(),
text=text, text_regex=text_regex, url=url, url_regex=url_regex, predicate=predicate)
return links and links[0] or None
def _filter_links(self, links,
text=None, text_regex=None,
url=None, url_regex=None,
predicate=None):
predicates = []
if text is not None:
predicates.append(lambda link: link.string == text)
if text_regex is not None:
predicates.append(lambda link: re_compile(text_regex).search(link.string or ''))
if url is not None:
predicates.append(lambda link: link.get('href') == url)
if url_regex is not None:
predicates.append(lambda link: re_compile(url_regex).search(link.get('href', '')))
if predicate:
predicate.append(predicate)
def f(link):
for p in predicates:
if not p(link):
return False
return True
return [link for link in links if f(link)]
def get_forms(self):
"""Returns all forms in the current document.
The returned form objects implement the ClientForm.HTMLForm interface.
"""
if self._forms is None:
import ClientForm
self._forms = ClientForm.ParseResponse(self.get_response(), backwards_compat=False)
return self._forms
def select_form(self, name=None, predicate=None, index=0):
"""Selects the specified form."""
forms = self.get_forms()
if name is not None:
forms = [f for f in forms if f.name == name]
if predicate:
forms = [f for f in forms if predicate(f)]
if forms:
self.form = forms[index]
return self.form
else:
raise BrowserError("No form selected.")
def submit(self):
"""submits the currently selected form."""
if self.form is None:
raise BrowserError("No form selected.")
req = self.form.click()
return self.do_request(req)
def __getitem__(self, key):
return self.form[key]
def __setitem__(self, key, value):
self.form[key] = value
class AppBrowser(Browser):
"""Browser interface to test web.py apps.
b = AppBrowser(app)
b.open('/')
b.follow_link(text='Login')
b.select_form(name='login')
b['username'] = 'joe'
b['password'] = 'secret'
b.submit()
assert b.path == '/'
assert 'Welcome joe' in b.get_text()
"""
def __init__(self, app):
Browser.__init__(self)
self.app = app
def build_opener(self):
return urllib2.build_opener(AppHandler(self.app))
class AppHandler(urllib2.HTTPHandler):
"""urllib2 handler to handle requests using web.py application."""
handler_order = 100
def __init__(self, app):
self.app = app
def http_open(self, req):
result = self.app.request(
localpart=req.get_selector(),
method=req.get_method(),
host=req.get_host(),
data=req.get_data(),
headers=dict(req.header_items()),
https=req.get_type() == "https"
)
return self._make_response(result, req.get_full_url())
def https_open(self, req):
return self.http_open(req)
https_request = urllib2.HTTPHandler.do_request_
def _make_response(self, result, url):
data = "\r\n".join(["%s: %s" % (k, v) for k, v in result.header_items])
headers = httplib.HTTPMessage(StringIO(data))
response = urllib.addinfourl(StringIO(result.data), headers, url)
code, msg = result.status.split(None, 1)
response.code, response.msg = int(code), msg
return response

0
web/contrib/__init__.py Normal file
View File

127
web/contrib/template.py Normal file
View File

@ -0,0 +1,127 @@
"""
Interface to various templating engines.
"""
import os.path
__all__ = [
"render_cheetah", "render_genshi", "render_mako",
"cache",
]
class render_cheetah:
"""Rendering interface to Cheetah Templates.
Example:
render = render_cheetah('templates')
render.hello(name="cheetah")
"""
def __init__(self, path):
# give error if Chetah is not installed
from Cheetah.Template import Template
self.path = path
def __getattr__(self, name):
from Cheetah.Template import Template
path = os.path.join(self.path, name + ".html")
def template(**kw):
t = Template(file=path, searchList=[kw])
return t.respond()
return template
class render_genshi:
"""Rendering interface genshi templates.
Example:
for xml/html templates.
render = render_genshi(['templates/'])
render.hello(name='genshi')
For text templates:
render = render_genshi(['templates/'], type='text')
render.hello(name='genshi')
"""
def __init__(self, *a, **kwargs):
from genshi.template import TemplateLoader
self._type = kwargs.pop('type', None)
self._loader = TemplateLoader(*a, **kwargs)
def __getattr__(self, name):
# Assuming all templates are html
path = name + ".html"
if self._type == "text":
from genshi.template import TextTemplate
cls = TextTemplate
type = "text"
else:
cls = None
type = None
t = self._loader.load(path, cls=cls)
def template(**kw):
stream = t.generate(**kw)
if type:
return stream.render(type)
else:
return stream.render()
return template
class render_jinja:
"""Rendering interface to Jinja2 Templates
Example:
render= render_jinja('templates')
render.hello(name='jinja2')
"""
def __init__(self, *a, **kwargs):
from jinja2 import Environment,FileSystemLoader
self._lookup = Environment(loader=FileSystemLoader(*a, **kwargs))
def __getattr__(self, name):
# Assuming all templates end with .html
path = name + '.html'
t = self._lookup.get_template(path)
return t.render
class render_mako:
"""Rendering interface to Mako Templates.
Example:
render = render_mako(directories=['templates'])
render.hello(name="mako")
"""
def __init__(self, *a, **kwargs):
from mako.lookup import TemplateLookup
self._lookup = TemplateLookup(*a, **kwargs)
def __getattr__(self, name):
# Assuming all templates are html
path = name + ".html"
t = self._lookup.get_template(path)
return t.render
class cache:
"""Cache for any rendering interface.
Example:
render = cache(render_cheetah("templates/"))
render.hello(name='cache')
"""
def __init__(self, render):
self._render = render
self._cache = {}
def __getattr__(self, name):
if name not in self._cache:
self._cache[name] = getattr(self._render, name)
return self._cache[name]

1122
web/db.py Normal file

File diff suppressed because it is too large Load Diff

355
web/debugerror.py Normal file
View File

@ -0,0 +1,355 @@
"""
pretty debug errors
(part of web.py)
portions adapted from Django <djangoproject.com>
Copyright (c) 2005, the Lawrence Journal-World
Used under the modified BSD license:
http://www.xfree86.org/3.3.6/COPYRIGHT2.html#5
"""
__all__ = ["debugerror", "djangoerror", "emailerrors"]
import sys, urlparse, pprint, traceback
from net import websafe
from template import Template
from utils import sendmail
import webapi as web
import os, os.path
whereami = os.path.join(os.getcwd(), __file__)
whereami = os.path.sep.join(whereami.split(os.path.sep)[:-1])
djangoerror_t = """\
$def with (exception_type, exception_value, frames)
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html lang="en">
<head>
<meta http-equiv="content-type" content="text/html; charset=utf-8" />
<meta name="robots" content="NONE,NOARCHIVE" />
<title>$exception_type at $ctx.path</title>
<style type="text/css">
html * { padding:0; margin:0; }
body * { padding:10px 20px; }
body * * { padding:0; }
body { font:small sans-serif; }
body>div { border-bottom:1px solid #ddd; }
h1 { font-weight:normal; }
h2 { margin-bottom:.8em; }
h2 span { font-size:80%; color:#666; font-weight:normal; }
h3 { margin:1em 0 .5em 0; }
h4 { margin:0 0 .5em 0; font-weight: normal; }
table {
border:1px solid #ccc; border-collapse: collapse; background:white; }
tbody td, tbody th { vertical-align:top; padding:2px 3px; }
thead th {
padding:1px 6px 1px 3px; background:#fefefe; text-align:left;
font-weight:normal; font-size:11px; border:1px solid #ddd; }
tbody th { text-align:right; color:#666; padding-right:.5em; }
table.vars { margin:5px 0 2px 40px; }
table.vars td, table.req td { font-family:monospace; }
table td.code { width:100%;}
table td.code div { overflow:hidden; }
table.source th { color:#666; }
table.source td {
font-family:monospace; white-space:pre; border-bottom:1px solid #eee; }
ul.traceback { list-style-type:none; }
ul.traceback li.frame { margin-bottom:1em; }
div.context { margin: 10px 0; }
div.context ol {
padding-left:30px; margin:0 10px; list-style-position: inside; }
div.context ol li {
font-family:monospace; white-space:pre; color:#666; cursor:pointer; }
div.context ol.context-line li { color:black; background-color:#ccc; }
div.context ol.context-line li span { float: right; }
div.commands { margin-left: 40px; }
div.commands a { color:black; text-decoration:none; }
#summary { background: #ffc; }
#summary h2 { font-weight: normal; color: #666; }
#explanation { background:#eee; }
#template, #template-not-exist { background:#f6f6f6; }
#template-not-exist ul { margin: 0 0 0 20px; }
#traceback { background:#eee; }
#requestinfo { background:#f6f6f6; padding-left:120px; }
#summary table { border:none; background:transparent; }
#requestinfo h2, #requestinfo h3 { position:relative; margin-left:-100px; }
#requestinfo h3 { margin-bottom:-1em; }
.error { background: #ffc; }
.specific { color:#cc3300; font-weight:bold; }
</style>
<script type="text/javascript">
//<!--
function getElementsByClassName(oElm, strTagName, strClassName){
// Written by Jonathan Snook, http://www.snook.ca/jon;
// Add-ons by Robert Nyman, http://www.robertnyman.com
var arrElements = (strTagName == "*" && document.all)? document.all :
oElm.getElementsByTagName(strTagName);
var arrReturnElements = new Array();
strClassName = strClassName.replace(/\-/g, "\\-");
var oRegExp = new RegExp("(^|\\s)" + strClassName + "(\\s|$$)");
var oElement;
for(var i=0; i<arrElements.length; i++){
oElement = arrElements[i];
if(oRegExp.test(oElement.className)){
arrReturnElements.push(oElement);
}
}
return (arrReturnElements)
}
function hideAll(elems) {
for (var e = 0; e < elems.length; e++) {
elems[e].style.display = 'none';
}
}
window.onload = function() {
hideAll(getElementsByClassName(document, 'table', 'vars'));
hideAll(getElementsByClassName(document, 'ol', 'pre-context'));
hideAll(getElementsByClassName(document, 'ol', 'post-context'));
}
function toggle() {
for (var i = 0; i < arguments.length; i++) {
var e = document.getElementById(arguments[i]);
if (e) {
e.style.display = e.style.display == 'none' ? 'block' : 'none';
}
}
return false;
}
function varToggle(link, id) {
toggle('v' + id);
var s = link.getElementsByTagName('span')[0];
var uarr = String.fromCharCode(0x25b6);
var darr = String.fromCharCode(0x25bc);
s.innerHTML = s.innerHTML == uarr ? darr : uarr;
return false;
}
//-->
</script>
</head>
<body>
$def dicttable (d, kls='req', id=None):
$ items = d and d.items() or []
$items.sort()
$:dicttable_items(items, kls, id)
$def dicttable_items(items, kls='req', id=None):
$if items:
<table class="$kls"
$if id: id="$id"
><thead><tr><th>Variable</th><th>Value</th></tr></thead>
<tbody>
$for k, v in items:
<tr><td>$k</td><td class="code"><div>$prettify(v)</div></td></tr>
</tbody>
</table>
$else:
<p>No data.</p>
<div id="summary">
<h1>$exception_type at $ctx.path</h1>
<h2>$exception_value</h2>
<table><tr>
<th>Python</th>
<td>$frames[0].filename in $frames[0].function, line $frames[0].lineno</td>
</tr><tr>
<th>Web</th>
<td>$ctx.method $ctx.home$ctx.path</td>
</tr></table>
</div>
<div id="traceback">
<h2>Traceback <span>(innermost first)</span></h2>
<ul class="traceback">
$for frame in frames:
<li class="frame">
<code>$frame.filename</code> in <code>$frame.function</code>
$if frame.context_line:
<div class="context" id="c$frame.id">
$if frame.pre_context:
<ol start="$frame.pre_context_lineno" class="pre-context" id="pre$frame.id">
$for line in frame.pre_context:
<li onclick="toggle('pre$frame.id', 'post$frame.id')">$line</li>
</ol>
<ol start="$frame.lineno" class="context-line"><li onclick="toggle('pre$frame.id', 'post$frame.id')">$frame.context_line <span>...</span></li></ol>
$if frame.post_context:
<ol start='${frame.lineno + 1}' class="post-context" id="post$frame.id">
$for line in frame.post_context:
<li onclick="toggle('pre$frame.id', 'post$frame.id')">$line</li>
</ol>
</div>
$if frame.vars:
<div class="commands">
<a href='#' onclick="return varToggle(this, '$frame.id')"><span>&#x25b6;</span> Local vars</a>
$# $inspect.formatargvalues(*inspect.getargvalues(frame['tb'].tb_frame))
</div>
$:dicttable(frame.vars, kls='vars', id=('v' + str(frame.id)))
</li>
</ul>
</div>
<div id="requestinfo">
$if ctx.output or ctx.headers:
<h2>Response so far</h2>
<h3>HEADERS</h3>
$:dicttable_items(ctx.headers)
<h3>BODY</h3>
<p class="req" style="padding-bottom: 2em"><code>
$ctx.output
</code></p>
<h2>Request information</h2>
<h3>INPUT</h3>
$:dicttable(web.input())
<h3 id="cookie-info">COOKIES</h3>
$:dicttable(web.cookies())
<h3 id="meta-info">META</h3>
$ newctx = [(k, v) for (k, v) in ctx.iteritems() if not k.startswith('_') and not isinstance(v, dict)]
$:dicttable(dict(newctx))
<h3 id="meta-info">ENVIRONMENT</h3>
$:dicttable(ctx.env)
</div>
<div id="explanation">
<p>
You're seeing this error because you have <code>web.config.debug</code>
set to <code>True</code>. Set that to <code>False</code> if you don't to see this.
</p>
</div>
</body>
</html>
"""
djangoerror_r = None
def djangoerror():
def _get_lines_from_file(filename, lineno, context_lines):
"""
Returns context_lines before and after lineno from file.
Returns (pre_context_lineno, pre_context, context_line, post_context).
"""
try:
source = open(filename).readlines()
lower_bound = max(0, lineno - context_lines)
upper_bound = lineno + context_lines
pre_context = \
[line.strip('\n') for line in source[lower_bound:lineno]]
context_line = source[lineno].strip('\n')
post_context = \
[line.strip('\n') for line in source[lineno + 1:upper_bound]]
return lower_bound, pre_context, context_line, post_context
except (OSError, IOError):
return None, [], None, []
exception_type, exception_value, tback = sys.exc_info()
frames = []
while tback is not None:
filename = tback.tb_frame.f_code.co_filename
function = tback.tb_frame.f_code.co_name
lineno = tback.tb_lineno - 1
pre_context_lineno, pre_context, context_line, post_context = \
_get_lines_from_file(filename, lineno, 7)
frames.append(web.storage({
'tback': tback,
'filename': filename,
'function': function,
'lineno': lineno,
'vars': tback.tb_frame.f_locals,
'id': id(tback),
'pre_context': pre_context,
'context_line': context_line,
'post_context': post_context,
'pre_context_lineno': pre_context_lineno,
}))
tback = tback.tb_next
frames.reverse()
urljoin = urlparse.urljoin
def prettify(x):
try:
out = pprint.pformat(x)
except Exception, e:
out = '[could not display: <' + e.__class__.__name__ + \
': '+str(e)+'>]'
return out
global djangoerror_r
if djangoerror_r is None:
djangoerror_r = Template(djangoerror_t, filename=__file__, filter=websafe)
t = djangoerror_r
globals = {'ctx': web.ctx, 'web':web, 'dict':dict, 'str':str, 'prettify': prettify}
t.t.func_globals.update(globals)
return t(exception_type, exception_value, frames)
def debugerror():
"""
A replacement for `internalerror` that presents a nice page with lots
of debug information for the programmer.
(Based on the beautiful 500 page from [Django](http://djangoproject.com/),
designed by [Wilson Miner](http://wilsonminer.com/).)
"""
return web._InternalError(djangoerror())
def emailerrors(email_address, olderror):
"""
Wraps the old `internalerror` handler (pass as `olderror`) to
additionally email all errors to `email_address`, to aid in
debugging production websites.
Emails contain a normal text traceback as well as an
attachment containing the nice `debugerror` page.
"""
def emailerrors_internal():
error = olderror()
tb = sys.exc_info()
error_name = tb[0]
error_value = tb[1]
tb_txt = ''.join(traceback.format_exception(*tb))
path = web.ctx.path
request = web.ctx.method+' '+web.ctx.home+web.ctx.fullpath
eaddr = email_address
text = ("""\
------here----
Content-Type: text/plain
Content-Disposition: inline
%(request)s
%(tb_txt)s
------here----
Content-Type: text/html; name="bug.html"
Content-Disposition: attachment; filename="bug.html"
""" % locals()) + str(djangoerror())
sendmail(
"your buggy site <%s>" % eaddr,
"the bugfixer <%s>" % eaddr,
"bug: %(error_name)s: %(error_value)s (%(path)s)" % locals(),
text,
headers={'Content-Type': 'multipart/mixed; boundary="----here----"'})
return error
return emailerrors_internal
if __name__ == "__main__":
urls = (
'/', 'index'
)
from application import application
app = application(urls, globals())
app.internalerror = debugerror
class index:
def GET(self):
thisdoesnotexist
app.run()

264
web/form.py Normal file
View File

@ -0,0 +1,264 @@
"""
HTML forms
(part of web.py)
"""
import copy, re
import webapi as web
import utils, net
def attrget(obj, attr, value=None):
if hasattr(obj, 'has_key') and obj.has_key(attr): return obj[attr]
if hasattr(obj, attr): return getattr(obj, attr)
return value
class Form:
r"""
HTML form.
>>> f = Form(Textbox("x"))
>>> f.render()
'<table>\n <tr><th><label for="x">x</label></th><td><input type="text" name="x" id="x" /></td></tr>\n</table>'
"""
def __init__(self, *inputs, **kw):
self.inputs = inputs
self.valid = True
self.note = None
self.validators = kw.pop('validators', [])
def __call__(self, x=None):
o = copy.deepcopy(self)
if x: o.validates(x)
return o
def render(self):
out = ''
out += self.rendernote(self.note)
out += '<table>\n'
for i in self.inputs:
out += ' <tr><th><label for="%s">%s</label></th>' % (i.id, net.websafe(i.description))
out += "<td>"+i.pre+i.render()+i.post+"</td></tr>\n"
out += "</table>"
return out
def render_css(self):
out = []
out.append(self.rendernote(self.note))
for i in self.inputs:
out.append('<label for="%s">%s</label>' % (i.id, net.websafe(i.description)))
out.append(i.pre)
out.append(i.render())
out.append(i.post)
out.append('\n')
return ''.join(out)
def rendernote(self, note):
if note: return '<strong class="wrong">%s</strong>' % net.websafe(note)
else: return ""
def validates(self, source=None, _validate=True, **kw):
source = source or kw or web.input()
out = True
for i in self.inputs:
v = attrget(source, i.name)
if _validate:
out = i.validate(v) and out
else:
i.value = v
if _validate:
out = out and self._validate(source)
self.valid = out
return out
def _validate(self, value):
self.value = value
for v in self.validators:
if not v.valid(value):
self.note = v.msg
return False
return True
def fill(self, source=None, **kw):
return self.validates(source, _validate=False, **kw)
def __getitem__(self, i):
for x in self.inputs:
if x.name == i: return x
raise KeyError, i
def __getattr__(self, name):
# don't interfere with deepcopy
inputs = self.__dict__.get('inputs') or []
for x in inputs:
if x.name == name: return x
raise AttributeError, name
def get(self, i, default=None):
try:
return self[i]
except KeyError:
return default
def _get_d(self): #@@ should really be form.attr, no?
return utils.storage([(i.name, i.value) for i in self.inputs])
d = property(_get_d)
class Input(object):
def __init__(self, name, *validators, **attrs):
self.description = attrs.pop('description', name)
self.value = attrs.pop('value', None)
self.pre = attrs.pop('pre', "")
self.post = attrs.pop('post', "")
self.id = attrs.setdefault('id', name)
if 'class_' in attrs:
attrs['class'] = attrs['class_']
del attrs['class_']
self.name, self.validators, self.attrs, self.note = name, validators, attrs, None
def validate(self, value):
self.value = value
for v in self.validators:
if not v.valid(value):
self.note = v.msg
return False
return True
def render(self): raise NotImplementedError
def rendernote(self, note):
if note: return '<strong class="wrong">%s</strong>' % net.websafe(note)
else: return ""
def addatts(self):
str = ""
for (n, v) in self.attrs.items():
str += ' %s="%s"' % (n, net.websafe(v))
return str
#@@ quoting
class Textbox(Input):
def render(self, shownote=True):
x = '<input type="text" name="%s"' % net.websafe(self.name)
if self.value: x += ' value="%s"' % net.websafe(self.value)
x += self.addatts()
x += ' />'
if shownote:
x += self.rendernote(self.note)
return x
class Password(Input):
def render(self):
x = '<input type="password" name="%s"' % net.websafe(self.name)
if self.value: x += ' value="%s"' % net.websafe(self.value)
x += self.addatts()
x += ' />'
x += self.rendernote(self.note)
return x
class Textarea(Input):
def render(self):
x = '<textarea name="%s"' % net.websafe(self.name)
x += self.addatts()
x += '>'
if self.value is not None: x += net.websafe(self.value)
x += '</textarea>'
x += self.rendernote(self.note)
return x
class Dropdown(Input):
def __init__(self, name, args, *validators, **attrs):
self.args = args
super(Dropdown, self).__init__(name, *validators, **attrs)
def render(self):
x = '<select name="%s"%s>\n' % (net.websafe(self.name), self.addatts())
for arg in self.args:
if type(arg) == tuple:
value, desc= arg
else:
value, desc = arg, arg
if self.value == value: select_p = ' selected="selected"'
else: select_p = ''
x += ' <option %s value="%s">%s</option>\n' % (select_p, net.websafe(value), net.websafe(desc))
x += '</select>\n'
x += self.rendernote(self.note)
return x
class Radio(Input):
def __init__(self, name, args, *validators, **attrs):
self.args = args
super(Radio, self).__init__(name, *validators, **attrs)
def render(self):
x = '<span>'
for arg in self.args:
if self.value == arg: select_p = ' checked="checked"'
else: select_p = ''
x += '<input type="radio" name="%s" value="%s"%s%s /> %s ' % (net.websafe(self.name), net.websafe(arg), select_p, self.addatts(), net.websafe(arg))
x += '</span>'
x += self.rendernote(self.note)
return x
class Checkbox(Input):
def render(self):
x = '<input name="%s" type="checkbox"' % net.websafe(self.name)
if self.value: x += ' checked="checked"'
x += self.addatts()
x += ' />'
x += self.rendernote(self.note)
return x
class Button(Input):
def __init__(self, name, *validators, **attrs):
super(Button, self).__init__(name, *validators, **attrs)
self.description = ""
def render(self):
safename = net.websafe(self.name)
x = '<button name="%s"%s>%s</button>' % (safename, self.addatts(), safename)
x += self.rendernote(self.note)
return x
class Hidden(Input):
def __init__(self, name, *validators, **attrs):
super(Hidden, self).__init__(name, *validators, **attrs)
# it doesnt make sence for a hidden field to have description
self.description = ""
def render(self):
x = '<input type="hidden" name="%s"' % net.websafe(self.name)
if self.value: x += ' value="%s"' % net.websafe(self.value)
x += self.addatts()
x += ' />'
return x
class File(Input):
def render(self):
x = '<input type="file" name="%s"' % net.websafe(self.name)
x += self.addatts()
x += ' />'
x += self.rendernote(self.note)
return x
class Validator:
def __deepcopy__(self, memo): return copy.copy(self)
def __init__(self, msg, test, jstest=None): utils.autoassign(self, locals())
def valid(self, value):
try: return self.test(value)
except: return False
notnull = Validator("Required", bool)
class regexp(Validator):
def __init__(self, rexp, msg):
self.rexp = re.compile(rexp)
self.msg = msg
def valid(self, value):
return bool(self.rexp.match(value))
if __name__ == "__main__":
import doctest
doctest.testmod()

138
web/http.py Normal file
View File

@ -0,0 +1,138 @@
"""
HTTP Utilities
(from web.py)
"""
__all__ = [
"expires", "lastmodified",
"prefixurl", "modified",
"write",
"changequery", "url",
"profiler",
]
import sys, os, threading, urllib, urlparse
try: import datetime
except ImportError: pass
import net, utils, webapi as web
def prefixurl(base=''):
"""
Sorry, this function is really difficult to explain.
Maybe some other time.
"""
url = web.ctx.path.lstrip('/')
for i in xrange(url.count('/')):
base += '../'
if not base:
base = './'
return base
def expires(delta):
"""
Outputs an `Expires` header for `delta` from now.
`delta` is a `timedelta` object or a number of seconds.
"""
if isinstance(delta, (int, long)):
delta = datetime.timedelta(seconds=delta)
date_obj = datetime.datetime.utcnow() + delta
web.header('Expires', net.httpdate(date_obj))
def lastmodified(date_obj):
"""Outputs a `Last-Modified` header for `datetime`."""
web.header('Last-Modified', net.httpdate(date_obj))
def modified(date=None, etag=None):
n = set(x.strip('" ') for x in web.ctx.env.get('HTTP_IF_NONE_MATCH', '').split(','))
m = net.parsehttpdate(web.ctx.env.get('HTTP_IF_MODIFIED_SINCE', '').split(';')[0])
validate = False
if etag:
if '*' in n or etag in n:
validate = True
if date and m:
# we subtract a second because
# HTTP dates don't have sub-second precision
if date-datetime.timedelta(seconds=1) <= m:
validate = True
if validate: web.ctx.status = '304 Not Modified'
return not validate
def write(cgi_response):
"""
Converts a standard CGI-style string response into `header` and
`output` calls.
"""
cgi_response = str(cgi_response)
cgi_response.replace('\r\n', '\n')
head, body = cgi_response.split('\n\n', 1)
lines = head.split('\n')
for line in lines:
if line.isspace():
continue
hdr, value = line.split(":", 1)
value = value.strip()
if hdr.lower() == "status":
web.ctx.status = value
else:
web.header(hdr, value)
web.output(body)
def urlencode(query):
"""
Same as urllib.urlencode, but supports unicode strings.
>>> urlencode({'text':'foo bar'})
'text=foo+bar'
"""
query = dict([(k, utils.utf8(v)) for k, v in query.items()])
return urllib.urlencode(query)
def changequery(query=None, **kw):
"""
Imagine you're at `/foo?a=1&b=2`. Then `changequery(a=3)` will return
`/foo?a=3&b=2` -- the same URL but with the arguments you requested
changed.
"""
if query is None:
query = web.input(_method='get')
for k, v in kw.iteritems():
if v is None:
query.pop(k, None)
else:
query[k] = v
out = web.ctx.path
if query:
out += '?' + urlencode(query)
return out
def url(path=None, **kw):
"""
Makes url by concatinating web.ctx.homepath and path and the
query string created using the arguments.
"""
if path is None:
path = web.ctx.path
if path.startswith("/"):
out = web.ctx.homepath + path
else:
out = path
if kw:
out += '?' + urlencode(kw)
return out
def profiler(app):
"""Outputs basic profiling information at the bottom of each response."""
from utils import profile
def profile_internal(e, o):
out, result = profile(app)(e, o)
return out + ['<pre>' + net.websafe(result) + '</pre>']
return profile_internal
if __name__ == "__main__":
import doctest
doctest.testmod()

225
web/httpserver.py Normal file
View File

@ -0,0 +1,225 @@
__all__ = ["runsimple"]
import sys, os
import webapi as web
import net
import utils
def runbasic(func, server_address=("0.0.0.0", 8080)):
"""
Runs a simple HTTP server hosting WSGI app `func`. The directory `static/`
is hosted statically.
Based on [WsgiServer][ws] from [Colin Stewart][cs].
[ws]: http://www.owlfish.com/software/wsgiutils/documentation/wsgi-server-api.html
[cs]: http://www.owlfish.com/
"""
# Copyright (c) 2004 Colin Stewart (http://www.owlfish.com/)
# Modified somewhat for simplicity
# Used under the modified BSD license:
# http://www.xfree86.org/3.3.6/COPYRIGHT2.html#5
import SimpleHTTPServer, SocketServer, BaseHTTPServer, urlparse
import socket, errno
import traceback
class WSGIHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
def run_wsgi_app(self):
protocol, host, path, parameters, query, fragment = \
urlparse.urlparse('http://dummyhost%s' % self.path)
# we only use path, query
env = {'wsgi.version': (1, 0)
,'wsgi.url_scheme': 'http'
,'wsgi.input': self.rfile
,'wsgi.errors': sys.stderr
,'wsgi.multithread': 1
,'wsgi.multiprocess': 0
,'wsgi.run_once': 0
,'REQUEST_METHOD': self.command
,'REQUEST_URI': self.path
,'PATH_INFO': path
,'QUERY_STRING': query
,'CONTENT_TYPE': self.headers.get('Content-Type', '')
,'CONTENT_LENGTH': self.headers.get('Content-Length', '')
,'REMOTE_ADDR': self.client_address[0]
,'SERVER_NAME': self.server.server_address[0]
,'SERVER_PORT': str(self.server.server_address[1])
,'SERVER_PROTOCOL': self.request_version
}
for http_header, http_value in self.headers.items():
env ['HTTP_%s' % http_header.replace('-', '_').upper()] = \
http_value
# Setup the state
self.wsgi_sent_headers = 0
self.wsgi_headers = []
try:
# We have there environment, now invoke the application
result = self.server.app(env, self.wsgi_start_response)
try:
try:
for data in result:
if data:
self.wsgi_write_data(data)
finally:
if hasattr(result, 'close'):
result.close()
except socket.error, socket_err:
# Catch common network errors and suppress them
if (socket_err.args[0] in \
(errno.ECONNABORTED, errno.EPIPE)):
return
except socket.timeout, socket_timeout:
return
except:
print >> web.debug, traceback.format_exc(),
if (not self.wsgi_sent_headers):
# We must write out something!
self.wsgi_write_data(" ")
return
do_POST = run_wsgi_app
do_PUT = run_wsgi_app
do_DELETE = run_wsgi_app
def do_GET(self):
if self.path.startswith('/static/'):
SimpleHTTPServer.SimpleHTTPRequestHandler.do_GET(self)
else:
self.run_wsgi_app()
def wsgi_start_response(self, response_status, response_headers,
exc_info=None):
if (self.wsgi_sent_headers):
raise Exception \
("Headers already sent and start_response called again!")
# Should really take a copy to avoid changes in the application....
self.wsgi_headers = (response_status, response_headers)
return self.wsgi_write_data
def wsgi_write_data(self, data):
if (not self.wsgi_sent_headers):
status, headers = self.wsgi_headers
# Need to send header prior to data
status_code = status[:status.find(' ')]
status_msg = status[status.find(' ') + 1:]
self.send_response(int(status_code), status_msg)
for header, value in headers:
self.send_header(header, value)
self.end_headers()
self.wsgi_sent_headers = 1
# Send the data
self.wfile.write(data)
class WSGIServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
def __init__(self, func, server_address):
BaseHTTPServer.HTTPServer.__init__(self,
server_address,
WSGIHandler)
self.app = func
self.serverShuttingDown = 0
print "http://%s:%d/" % server_address
WSGIServer(func, server_address).serve_forever()
def runsimple(func, server_address=("0.0.0.0", 8080)):
"""
Runs [CherryPy][cp] WSGI server hosting WSGI app `func`.
The directory `static/` is hosted statically.
[cp]: http://www.cherrypy.org
"""
from wsgiserver import CherryPyWSGIServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
from BaseHTTPServer import BaseHTTPRequestHandler
class StaticApp(SimpleHTTPRequestHandler):
"""WSGI application for serving static files."""
def __init__(self, environ, start_response):
self.headers = []
self.environ = environ
self.start_response = start_response
def send_response(self, status, msg=""):
self.status = str(status) + " " + msg
def send_header(self, name, value):
self.headers.append((name, value))
def end_headers(self):
pass
def log_message(*a): pass
def __iter__(self):
environ = self.environ
self.path = environ.get('PATH_INFO', '')
self.client_address = environ.get('REMOTE_ADDR','-'), \
environ.get('REMOTE_PORT','-')
self.command = environ.get('REQUEST_METHOD', '-')
from cStringIO import StringIO
self.wfile = StringIO() # for capturing error
f = self.send_head()
self.start_response(self.status, self.headers)
if f:
block_size = 16 * 1024
while True:
buf = f.read(block_size)
if not buf:
break
yield buf
f.close()
else:
value = self.wfile.getvalue()
yield value
class WSGIWrapper(BaseHTTPRequestHandler):
"""WSGI wrapper for logging the status and serving static files."""
def __init__(self, app):
self.app = app
self.format = '%s - - [%s] "%s %s %s" - %s'
def __call__(self, environ, start_response):
def xstart_response(status, response_headers, *args):
write = start_response(status, response_headers, *args)
self.log(status, environ)
return write
path = environ.get('PATH_INFO', '')
if path.startswith('/static/'):
return StaticApp(environ, xstart_response)
else:
return self.app(environ, xstart_response)
def log(self, status, environ):
outfile = environ.get('wsgi.errors', web.debug)
req = environ.get('PATH_INFO', '_')
protocol = environ.get('ACTUAL_SERVER_PROTOCOL', '-')
method = environ.get('REQUEST_METHOD', '-')
host = "%s:%s" % (environ.get('REMOTE_ADDR','-'),
environ.get('REMOTE_PORT','-'))
#@@ It is really bad to extend from
#@@ BaseHTTPRequestHandler just for this method
time = self.log_date_time_string()
msg = self.format % (host, time, protocol, method, req, status)
print >> outfile, utils.safestr(msg)
func = WSGIWrapper(func)
server = CherryPyWSGIServer(server_address, func, server_name="localhost")
print "http://%s:%d/" % server_address
try:
server.start()
except KeyboardInterrupt:
server.stop()

190
web/net.py Normal file
View File

@ -0,0 +1,190 @@
"""
Network Utilities
(from web.py)
"""
__all__ = [
"validipaddr", "validipport", "validip", "validaddr",
"urlquote",
"httpdate", "parsehttpdate",
"htmlquote", "htmlunquote", "websafe",
]
import urllib, time
try: import datetime
except ImportError: pass
def validipaddr(address):
"""
Returns True if `address` is a valid IPv4 address.
>>> validipaddr('192.168.1.1')
True
>>> validipaddr('192.168.1.800')
False
>>> validipaddr('192.168.1')
False
"""
try:
octets = address.split('.')
if len(octets) != 4:
return False
for x in octets:
if not (0 <= int(x) <= 255):
return False
except ValueError:
return False
return True
def validipport(port):
"""
Returns True if `port` is a valid IPv4 port.
>>> validipport('9000')
True
>>> validipport('foo')
False
>>> validipport('1000000')
False
"""
try:
if not (0 <= int(port) <= 65535):
return False
except ValueError:
return False
return True
def validip(ip, defaultaddr="0.0.0.0", defaultport=8080):
"""Returns `(ip_address, port)` from string `ip_addr_port`"""
addr = defaultaddr
port = defaultport
ip = ip.split(":", 1)
if len(ip) == 1:
if not ip[0]:
pass
elif validipaddr(ip[0]):
addr = ip[0]
elif validipport(ip[0]):
port = int(ip[0])
else:
raise ValueError, ':'.join(ip) + ' is not a valid IP address/port'
elif len(ip) == 2:
addr, port = ip
if not validipaddr(addr) and validipport(port):
raise ValueError, ':'.join(ip) + ' is not a valid IP address/port'
port = int(port)
else:
raise ValueError, ':'.join(ip) + ' is not a valid IP address/port'
return (addr, port)
def validaddr(string_):
"""
Returns either (ip_address, port) or "/path/to/socket" from string_
>>> validaddr('/path/to/socket')
'/path/to/socket'
>>> validaddr('8000')
('0.0.0.0', 8000)
>>> validaddr('127.0.0.1')
('127.0.0.1', 8080)
>>> validaddr('127.0.0.1:8000')
('127.0.0.1', 8000)
>>> validaddr('fff')
Traceback (most recent call last):
...
ValueError: fff is not a valid IP address/port
"""
if '/' in string_:
return string_
else:
return validip(string_)
def urlquote(val):
"""
Quotes a string for use in a URL.
>>> urlquote('://?f=1&j=1')
'%3A//%3Ff%3D1%26j%3D1'
>>> urlquote(None)
''
>>> urlquote(u'\u203d')
'%E2%80%BD'
"""
if val is None: return ''
if not isinstance(val, unicode): val = str(val)
else: val = val.encode('utf-8')
return urllib.quote(val)
def httpdate(date_obj):
"""
Formats a datetime object for use in HTTP headers.
>>> import datetime
>>> httpdate(datetime.datetime(1970, 1, 1, 1, 1, 1))
'Thu, 01 Jan 1970 01:01:01 GMT'
"""
return date_obj.strftime("%a, %d %b %Y %H:%M:%S GMT")
def parsehttpdate(string_):
"""
Parses an HTTP date into a datetime object.
>>> parsehttpdate('Thu, 01 Jan 1970 01:01:01 GMT')
datetime.datetime(1970, 1, 1, 1, 1, 1)
"""
try:
t = time.strptime(string_, "%a, %d %b %Y %H:%M:%S %Z")
except ValueError:
return None
return datetime.datetime(*t[:6])
def htmlquote(text):
"""
Encodes `text` for raw use in HTML.
>>> htmlquote("<'&\\">")
'&lt;&#39;&amp;&quot;&gt;'
"""
text = text.replace("&", "&amp;") # Must be done first!
text = text.replace("<", "&lt;")
text = text.replace(">", "&gt;")
text = text.replace("'", "&#39;")
text = text.replace('"', "&quot;")
return text
def htmlunquote(text):
"""
Decodes `text` that's HTML quoted.
>>> htmlunquote('&lt;&#39;&amp;&quot;&gt;')
'<\\'&">'
"""
text = text.replace("&quot;", '"')
text = text.replace("&#39;", "'")
text = text.replace("&gt;", ">")
text = text.replace("&lt;", "<")
text = text.replace("&amp;", "&") # Must be done last!
return text
def websafe(val):
"""
Converts `val` so that it's safe for use in UTF-8 HTML.
>>> websafe("<'&\\">")
'&lt;&#39;&amp;&quot;&gt;'
>>> websafe(None)
''
>>> websafe(u'\u203d')
'\\xe2\\x80\\xbd'
"""
if val is None:
return ''
if isinstance(val, unicode):
val = val.encode('utf-8')
val = str(val)
return htmlquote(val)
if __name__ == "__main__":
import doctest
doctest.testmod()

317
web/session.py Normal file
View File

@ -0,0 +1,317 @@
"""
Session Management
(from web.py)
"""
import os, time, datetime, random, base64
try:
import cPickle as pickle
except ImportError:
import pickle
try:
import hashlib
sha1 = hashlib.sha1
except ImportError:
import sha
sha1 = sha.new
import utils
import webapi as web
__all__ = [
'Session', 'SessionExpired',
'Store', 'DiskStore', 'DBStore',
]
web.config.session_parameters = utils.storage({
'cookie_name': 'webpy_session_id',
'cookie_domain': None,
'timeout': 86400, #24 * 60 * 60, # 24 hours in seconds
'ignore_expiry': True,
'ignore_change_ip': True,
'secret_key': 'fLjUfxqXtfNoIldA0A0J',
'expired_message': 'Session expired',
})
class SessionExpired(web.HTTPError):
def __init__(self, message):
web.HTTPError.__init__(self, '200 OK', {}, data=message)
class Session(utils.ThreadedDict):
"""Session management for web.py
"""
def __init__(self, app, store, initializer=None):
self.__dict__['store'] = store
self.__dict__['_initializer'] = initializer
self.__dict__['_last_cleanup_time'] = 0
self.__dict__['_config'] = utils.storage(web.config.session_parameters)
if app:
app.add_processor(self._processor)
def _processor(self, handler):
"""Application processor to setup session for every request"""
self._cleanup()
self._load()
try:
return handler()
finally:
self._save()
def _load(self):
"""Load the session from the store, by the id from cookie"""
cookie_name = self._config.cookie_name
cookie_domain = self._config.cookie_domain
self.session_id = web.cookies().get(cookie_name)
# protection against session_id tampering
if self.session_id and not self._valid_session_id(self.session_id):
self.session_id = None
self._check_expiry()
if self.session_id:
d = self.store[self.session_id]
self.update(d)
self._validate_ip()
if not self.session_id:
self.session_id = self._generate_session_id()
if self._initializer:
if isinstance(self._initializer, dict):
self.update(self._initializer)
elif hasattr(self._initializer, '__call__'):
self._initializer()
self.ip = web.ctx.ip
def _check_expiry(self):
# check for expiry
if self.session_id and self.session_id not in self.store:
if self._config.ignore_expiry:
self.session_id = None
else:
return self.expired()
def _validate_ip(self):
# check for change of IP
if self.session_id and self.get('ip', None) != web.ctx.ip:
if not self._config.ignore_change_ip:
return self.expired()
def _save(self):
cookie_name = self._config.cookie_name
cookie_domain = self._config.cookie_domain
if not self.get('_killed'):
web.setcookie(cookie_name, self.session_id, domain=cookie_domain)
self.store[self.session_id] = dict(self)
else:
web.setcookie(cookie_name, self.session_id, expires=-1, domain=cookie_domain)
def _generate_session_id(self):
"""Generate a random id for session"""
while True:
rand = os.urandom(16)
now = time.time()
secret_key = self._config.secret_key
session_id = sha1("%s%s%s%s" %(rand, now, utils.safestr(web.ctx.ip), secret_key))
session_id = session_id.hexdigest()
if session_id not in self.store:
break
return session_id
def _valid_session_id(self, session_id):
rx = utils.re_compile('^[0-9a-fA-F]+$')
return rx.match(session_id)
def _cleanup(self):
"""Cleanup the stored sessions"""
current_time = time.time()
timeout = self._config.timeout
if current_time - self._last_cleanup_time > timeout:
self.store.cleanup(timeout)
self.__dict__['_last_cleanup_time'] = current_time
def expired(self):
"""Called when an expired session is atime"""
raise SessionExpired(self._config.expired_message)
def kill(self):
"""Kill the session, make it no longer available"""
del self.store[self.session_id]
self._killed = True
class Store:
"""Base class for session stores"""
def __contains__(self, key):
raise NotImplemented
def __getitem__(self, key):
raise NotImplemented
def __setitem__(self, key, value):
raise NotImplemented
def cleanup(self, timeout):
"""removes all the expired sessions"""
raise NotImplemented
def encode(self, session_dict):
"""encodes session dict as a string"""
pickled = pickle.dumps(session_dict)
return base64.encodestring(pickled)
def decode(self, session_data):
"""decodes the data to get back the session dict """
pickled = base64.decodestring(session_data)
return pickle.loads(pickled)
class DiskStore(Store):
"""
Store for saving a session on disk.
>>> import tempfile
>>> root = tempfile.mkdtemp()
>>> s = DiskStore(root)
>>> s['a'] = 'foo'
>>> s['a']
'foo'
>>> time.sleep(0.01)
>>> s.cleanup(0.01)
>>> s['a']
Traceback (most recent call last):
...
KeyError: 'a'
"""
def __init__(self, root):
# if the storage root doesn't exists, create it.
if not os.path.exists(root):
os.mkdir(root)
self.root = root
def _get_path(self, key):
if os.path.sep in key:
raise ValueError, "Bad key: %s" % repr(key)
return os.path.join(self.root, key)
def __contains__(self, key):
path = self._get_path(key)
return os.path.exists(path)
def __getitem__(self, key):
path = self._get_path(key)
if os.path.exists(path):
pickled = open(path).read()
return self.decode(pickled)
else:
raise KeyError, key
def __setitem__(self, key, value):
path = self._get_path(key)
pickled = self.encode(value)
try:
f = open(path, 'w')
try:
f.write(pickled)
finally:
f.close()
except IOError:
pass
def __delitem__(self, key):
path = self._get_path(key)
if os.path.exists(path):
os.remove(path)
def cleanup(self, timeout):
now = time.time()
for f in os.listdir(self.root):
path = self._get_path(f)
atime = os.stat(path).st_atime
if now - atime > timeout :
os.remove(path)
class DBStore(Store):
"""Store for saving a session in database
Needs a table with the following columns:
session_id CHAR(128) UNIQUE NOT NULL,
atime DATETIME NOT NULL default current_timestamp,
data TEXT
"""
def __init__(self, db, table_name):
self.db = db
self.table = table_name
def __contains__(self, key):
data = self.db.select(self.table, where="session_id=$key", vars=locals())
return bool(list(data))
def __getitem__(self, key):
now = datetime.datetime.now()
try:
s = self.db.select(self.table, where="session_id=$key", vars=locals())[0]
self.db.update(self.table, where="session_id=$key", atime=now, vars=locals())
except IndexError:
raise KeyError
else:
return self.decode(s.data)
def __setitem__(self, key, value):
pickled = self.encode(value)
now = datetime.datetime.now()
if key in self:
self.db.update(self.table, where="session_id=$key", data=pickled, vars=locals())
else:
self.db.insert(self.table, False, session_id=key, data=pickled )
def __delitem__(self, key):
self.db.delete(self.table, where="session_id=$key", vars=locals())
def cleanup(self, timeout):
timeout = datetime.timedelta(timeout/(24.0*60*60)) #timedelta takes numdays as arg
last_allowed_time = datetime.datetime.now() - timeout
self.db.delete(self.table, where="$last_allowed_time > atime", vars=locals())
class ShelfStore:
"""Store for saving session using `shelve` module.
import shelve
store = ShelfStore(shelve.open('session.shelf'))
XXX: is shelve thread-safe?
"""
def __init__(self, shelf):
self.shelf = shelf
def __contains__(self, key):
return key in self.shelf
def __getitem__(self, key):
atime, v = self.shelf[key]
self[k] = v # update atime
return v
def __setitem__(self, key, value):
self.shelf[key] = time.time(), value
def __delitem__(self, key):
try:
del self.shelf[key]
except KeyError:
pass
def cleanup(self, timeout):
now = time.time()
for k in self.shelf.keys():
atime, v = self.shelf[k]
if now - atime > timeout :
del self[k]
if __name__ == '__main__' :
import doctest
doctest.testmod()

1376
web/template.py Normal file

File diff suppressed because it is too large Load Diff

51
web/test.py Normal file
View File

@ -0,0 +1,51 @@
"""test utilities
(part of web.py)
"""
import unittest
import sys, os
import web
TestCase = unittest.TestCase
TestSuite = unittest.TestSuite
def load_modules(names):
return [__import__(name, None, None, "x") for name in names]
def module_suite(module, classnames=None):
"""Makes a suite from a module."""
if classnames:
return unittest.TestLoader().loadTestsFromNames(classnames, module)
elif hasattr(module, 'suite'):
return module.suite()
else:
return unittest.TestLoader().loadTestsFromModule(module)
def doctest_suite(module_names):
"""Makes a test suite from doctests."""
import doctest
suite = TestSuite()
for mod in load_modules(module_names):
suite.addTest(doctest.DocTestSuite(mod))
return suite
def suite(module_names):
"""Creates a suite from multiple modules."""
suite = TestSuite()
for mod in load_modules(module_names):
suite.addTest(module_suite(mod))
return suite
def runTests(suite):
runner = unittest.TextTestRunner()
return runner.run(suite)
def main(suite=None):
if not suite:
main_module = __import__('__main__')
# allow command line switches
args = [a for a in sys.argv[1:] if not a.startswith('-')]
suite = module_suite(main_module, args or None)
result = runTests(suite)
sys.exit(not result.wasSuccessful())

1017
web/utils.py Normal file

File diff suppressed because it is too large Load Diff

325
web/webapi.py Normal file
View File

@ -0,0 +1,325 @@
"""
Web API (wrapper around WSGI)
(from web.py)
"""
__all__ = [
"config",
"header", "debug",
"input", "data",
"setcookie", "cookies",
"ctx",
"HTTPError",
"BadRequest", "NotFound", "Gone", "InternalError",
"badrequest", "notfound", "gone", "internalerror",
"Redirect", "Found", "SeeOther", "TempRedirect",
"redirect", "found", "seeother", "tempredirect",
"NoMethod", "nomethod",
]
import sys, cgi, Cookie, pprint, urlparse, urllib
from utils import storage, storify, threadeddict, dictadd, intget, utf8
config = storage()
config.__doc__ = """
A configuration object for various aspects of web.py.
`debug`
: when True, enables reloading, disabled template caching and sets internalerror to debugerror.
"""
class HTTPError(Exception):
def __init__(self, status, headers, data=""):
ctx.status = status
for k, v in headers.items():
header(k, v)
self.data = data
Exception.__init__(self, status)
class BadRequest(HTTPError):
"""`400 Bad Request` error."""
message = "bad request"
def __init__(self):
status = "400 Bad Request"
headers = {'Content-Type': 'text/html'}
HTTPError.__init__(self, status, headers, self.message)
badrequest = BadRequest
class _NotFound(HTTPError):
"""`404 Not Found` error."""
message = "not found"
def __init__(self, message=None):
status = '404 Not Found'
headers = {'Content-Type': 'text/html'}
HTTPError.__init__(self, status, headers, message or self.message)
def NotFound(message=None):
"""Returns HTTPError with '404 Not Found' error from the active application.
"""
if message:
return _NotFound(message)
elif ctx.get('app_stack'):
return ctx.app_stack[-1].notfound()
else:
return _NotFound()
notfound = NotFound
class Gone(HTTPError):
"""`410 Gone` error."""
message = "gone"
def __init__(self):
status = '410 Gone'
headers = {'Content-Type': 'text/html'}
HTTPError.__init__(self, status, headers, self.message)
gone = Gone
class Redirect(HTTPError):
"""A `301 Moved Permanently` redirect."""
def __init__(self, url, status='301 Moved Permanently', absolute=False):
"""
Returns a `status` redirect to the new URL.
`url` is joined with the base URL so that things like
`redirect("about") will work properly.
"""
newloc = urlparse.urljoin(ctx.path, url)
if newloc.startswith('/'):
if absolute:
home = ctx.realhome
else:
home = ctx.home
newloc = home + newloc
headers = {
'Content-Type': 'text/html',
'Location': newloc
}
HTTPError.__init__(self, status, headers, "")
redirect = Redirect
class Found(Redirect):
"""A `302 Found` redirect."""
def __init__(self, url, absolute=False):
Redirect.__init__(self, url, '302 Found', absolute=absolute)
found = Found
class SeeOther(Redirect):
"""A `303 See Other` redirect."""
def __init__(self, url, absolute=False):
Redirect.__init__(self, url, '303 See Other', absolute=absolute)
seeother = SeeOther
class TempRedirect(Redirect):
"""A `307 Temporary Redirect` redirect."""
def __init__(self, url, absolute=False):
Redirect.__init__(self, url, '307 Temporary Redirect', absolute=absolute)
tempredirect = TempRedirect
class NoMethod(HTTPError):
"""A `405 Method Not Allowed` error."""
def __init__(self, cls=None):
status = '405 Method Not Allowed'
headers = {}
headers['Content-Type'] = 'text/html'
methods = ['GET', 'HEAD', 'POST', 'PUT', 'DELETE']
if cls:
methods = [method for method in methods if hasattr(cls, method)]
headers['Allow'] = ', '.join(methods)
data = None
HTTPError.__init__(self, status, headers, data)
nomethod = NoMethod
class _InternalError(HTTPError):
"""500 Internal Server Error`."""
message = "internal server error"
def __init__(self, message=None):
status = '500 Internal Server Error'
headers = {'Content-Type': 'text/html'}
HTTPError.__init__(self, status, headers, message or self.message)
def InternalError(message=None):
"""Returns HTTPError with '500 internal error' error from the active application.
"""
if message:
return _InternalError(message)
elif ctx.get('app_stack'):
return ctx.app_stack[-1].internalerror()
else:
return _InternalError()
internalerror = InternalError
def header(hdr, value, unique=False):
"""
Adds the header `hdr: value` with the response.
If `unique` is True and a header with that name already exists,
it doesn't add a new one.
"""
hdr, value = utf8(hdr), utf8(value)
# protection against HTTP response splitting attack
if '\n' in hdr or '\r' in hdr or '\n' in value or '\r' in value:
raise ValueError, 'invalid characters in header'
if unique is True:
for h, v in ctx.headers:
if h.lower() == hdr.lower(): return
ctx.headers.append((hdr, value))
def input(*requireds, **defaults):
"""
Returns a `storage` object with the GET and POST arguments.
See `storify` for how `requireds` and `defaults` work.
"""
from cStringIO import StringIO
def dictify(fs):
# hack to make web.input work with enctype='text/plain.
if fs.list is None:
fs.list = []
return dict([(k, fs[k]) for k in fs.keys()])
_method = defaults.pop('_method', 'both')
e = ctx.env.copy()
a = b = {}
if _method.lower() in ['both', 'post', 'put']:
if e['REQUEST_METHOD'] in ['POST', 'PUT']:
if e.get('CONTENT_TYPE', '').lower().startswith('multipart/'):
# since wsgi.input is directly passed to cgi.FieldStorage,
# it can not be called multiple times. Saving the FieldStorage
# object in ctx to allow calling web.input multiple times.
a = ctx.get('_fieldstorage')
if not a:
fp = e['wsgi.input']
a = cgi.FieldStorage(fp=fp, environ=e, keep_blank_values=1)
ctx._fieldstorage = a
else:
fp = StringIO(data())
a = cgi.FieldStorage(fp=fp, environ=e, keep_blank_values=1)
a = dictify(a)
if _method.lower() in ['both', 'get']:
e['REQUEST_METHOD'] = 'GET'
b = dictify(cgi.FieldStorage(environ=e, keep_blank_values=1))
out = dictadd(b, a)
try:
defaults.setdefault('_unicode', True) # force unicode conversion by default.
return storify(out, *requireds, **defaults)
except KeyError:
raise badrequest()
def data():
"""Returns the data sent with the request."""
if 'data' not in ctx:
cl = intget(ctx.env.get('CONTENT_LENGTH'), 0)
ctx.data = ctx.env['wsgi.input'].read(cl)
return ctx.data
def setcookie(name, value, expires="", domain=None, secure=False):
"""Sets a cookie."""
if expires < 0:
expires = -1000000000
kargs = {'expires': expires, 'path':'/'}
if domain:
kargs['domain'] = domain
if secure:
kargs['secure'] = secure
# @@ should we limit cookies to a different path?
cookie = Cookie.SimpleCookie()
cookie[name] = urllib.quote(utf8(value))
for key, val in kargs.iteritems():
cookie[name][key] = val
header('Set-Cookie', cookie.items()[0][1].OutputString())
def cookies(*requireds, **defaults):
"""
Returns a `storage` object with all the cookies in it.
See `storify` for how `requireds` and `defaults` work.
"""
cookie = Cookie.SimpleCookie()
cookie.load(ctx.env.get('HTTP_COOKIE', ''))
try:
d = storify(cookie, *requireds, **defaults)
for k, v in d.items():
d[k] = v and urllib.unquote(v)
return d
except KeyError:
badrequest()
raise StopIteration
def debug(*args):
"""
Prints a prettyprinted version of `args` to stderr.
"""
try:
out = ctx.environ['wsgi.errors']
except:
out = sys.stderr
for arg in args:
print >> out, pprint.pformat(arg)
return ''
def _debugwrite(x):
try:
out = ctx.environ['wsgi.errors']
except:
out = sys.stderr
out.write(x)
debug.write = _debugwrite
ctx = context = threadeddict()
ctx.__doc__ = """
A `storage` object containing various information about the request:
`environ` (aka `env`)
: A dictionary containing the standard WSGI environment variables.
`host`
: The domain (`Host` header) requested by the user.
`home`
: The base path for the application.
`ip`
: The IP address of the requester.
`method`
: The HTTP method used.
`path`
: The path request.
`query`
: If there are no query arguments, the empty string. Otherwise, a `?` followed
by the query string.
`fullpath`
: The full path requested, including query arguments (`== path + query`).
### Response Data
`status` (default: "200 OK")
: The status code to be used in the response.
`headers`
: A list of 2-tuples to be used in the response.
`output`
: A string to be used as the response.
"""

115
web/webopenid.py Normal file
View File

@ -0,0 +1,115 @@
"""openid.py: an openid library for web.py
Notes:
- This will create a file called .openid_secret_key in the
current directory with your secret key in it. If someone
has access to this file they can log in as any user. And
if the app can't find this file for any reason (e.g. you
moved the app somewhere else) then each currently logged
in user will get logged out.
- State must be maintained through the entire auth process
-- this means that if you have multiple web.py processes
serving one set of URLs or if you restart your app often
then log ins will fail. You have to replace sessions and
store for things to work.
- We set cookies starting with "openid_".
"""
import os
import random
import hmac
import __init__ as web
import openid.consumer.consumer
import openid.store.memstore
sessions = {}
store = openid.store.memstore.MemoryStore()
def _secret():
try:
secret = file('.openid_secret_key').read()
except IOError:
# file doesn't exist
secret = os.urandom(20)
file('.openid_secret_key', 'w').write(secret)
return secret
def _hmac(identity_url):
return hmac.new(_secret(), identity_url).hexdigest()
def _random_session():
n = random.random()
while n in sessions:
n = random.random()
n = str(n)
return n
def status():
oid_hash = web.cookies().get('openid_identity_hash', '').split(',', 1)
if len(oid_hash) > 1:
oid_hash, identity_url = oid_hash
if oid_hash == _hmac(identity_url):
return identity_url
return None
def form(openid_loc):
oid = status()
if oid:
return '''
<form method="post" action="%s">
<img src="http://openid.net/login-bg.gif" alt="OpenID" />
<strong>%s</strong>
<input type="hidden" name="action" value="logout" />
<input type="hidden" name="return_to" value="%s" />
<button type="submit">log out</button>
</form>''' % (openid_loc, oid, web.ctx.fullpath)
else:
return '''
<form method="post" action="%s">
<input type="text" name="openid" value=""
style="background: url(http://openid.net/login-bg.gif) no-repeat; padding-left: 18px; background-position: 0 50%%;" />
<input type="hidden" name="return_to" value="%s" />
<button type="submit">log in</button>
</form>''' % (openid_loc, web.ctx.fullpath)
def logout():
web.setcookie('openid_identity_hash', '', expires=-1)
class host:
def POST(self):
# unlike the usual scheme of things, the POST is actually called
# first here
i = web.input(return_to='/')
if i.get('action') == 'logout':
logout()
return web.redirect(i.return_to)
i = web.input('openid', return_to='/')
n = _random_session()
sessions[n] = {'webpy_return_to': i.return_to}
c = openid.consumer.consumer.Consumer(sessions[n], store)
a = c.begin(i.openid)
f = a.redirectURL(web.ctx.home, web.ctx.home + web.ctx.fullpath)
web.setcookie('openid_session_id', n)
return web.redirect(f)
def GET(self):
n = web.cookies('openid_session_id').openid_session_id
web.setcookie('openid_session_id', '', expires=-1)
return_to = sessions[n]['webpy_return_to']
c = openid.consumer.consumer.Consumer(sessions[n], store)
a = c.complete(web.input(), web.ctx.home + web.ctx.fullpath)
if a.status.lower() == 'success':
web.setcookie('openid_identity_hash', _hmac(a.identity_url) + ',' + a.identity_url)
del sessions[n]
return web.redirect(return_to)

65
web/wsgi.py Normal file
View File

@ -0,0 +1,65 @@
"""
WSGI Utilities
(from web.py)
"""
import os, sys
import http
import webapi as web
from utils import listget
from net import validaddr, validip
import httpserver
def runfcgi(func, addr=('localhost', 8000)):
"""Runs a WSGI function as a FastCGI server."""
import flup.server.fcgi as flups
return flups.WSGIServer(func, multiplexed=True, bindAddress=addr).run()
def runscgi(func, addr=('localhost', 4000)):
"""Runs a WSGI function as an SCGI server."""
import flup.server.scgi as flups
return flups.WSGIServer(func, bindAddress=addr).run()
def runwsgi(func):
"""
Runs a WSGI-compatible `func` using FCGI, SCGI, or a simple web server,
as appropriate based on context and `sys.argv`.
"""
if os.environ.has_key('SERVER_SOFTWARE'): # cgi
os.environ['FCGI_FORCE_CGI'] = 'Y'
if (os.environ.has_key('PHP_FCGI_CHILDREN') #lighttpd fastcgi
or os.environ.has_key('SERVER_SOFTWARE')):
return runfcgi(func, None)
if 'fcgi' in sys.argv or 'fastcgi' in sys.argv:
args = sys.argv[1:]
if 'fastcgi' in args: args.remove('fastcgi')
elif 'fcgi' in args: args.remove('fcgi')
if args:
return runfcgi(func, validaddr(args[0]))
else:
return runfcgi(func, None)
if 'scgi' in sys.argv:
args = sys.argv[1:]
args.remove('scgi')
if args:
return runscgi(func, validaddr(args[0]))
else:
return runscgi(func)
return httpserver.runsimple(func, validip(listget(sys.argv, 1, '')))
def _is_dev_mode():
# quick hack to check if the program is running in dev mode.
if os.environ.has_key('SERVER_SOFTWARE') \
or os.environ.has_key('PHP_FCGI_CHILDREN') \
or 'fcgi' in sys.argv or 'fastcgi' in sys.argv:
return False
return True
# When running the builtin-server, enable debug mode if not already set.
web.config.setdefault('debug', _is_dev_mode())

View File

@ -0,0 +1,25 @@
Copyright (c) 2004-2007, CherryPy Team (team@cherrypy.org)
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the CherryPy Team nor the names of its contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

1782
web/wsgiserver/__init__.py Normal file

File diff suppressed because it is too large Load Diff