1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
#!/usr/bin/env python
#
# Copyright 2012 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Derived from oauth2.py (https://github.com/google/gmail-oauth2-tools).
# Heavily modified and rewritten by Stefan Krah.
#
import os
import sys
import json
import argparse
import time
try:
import urllib.request as urllibrequest
import urllib.parse as urllibparse
raw_input = input
except: #py2
import urllib as urllibparse
urllibrequest = urllibparse
class OAuth2(object):
def __init__(self, token_data_path):
self.token_data_path = token_data_path
with open(self.token_data_path) as f:
self.data = json.load(f)
def copy(self, *keys):
data = {}
for k in keys:
data[k] = self.data[k]
return data
def query(self, params):
lst = []
for param in sorted(params.items(), key=lambda x: x[0]):
escaped = urllibparse.quote(param[1], safe='~-._')
lst.append('%s=%s' % (param[0], escaped))
return '&'.join(lst)
def code_url(self):
params = self.copy('scope', 'client_id', 'redirect_uri')
params['response_type'] = 'code'
return '%s?%s' % (self.data['auth_uri'], self.query(params))
def get_response(self, url, params):
encoded = urllibparse.urlencode(params).encode('ascii')
response = urllibrequest.urlopen(url, encoded).read()
return json.loads(response)
def update_config(self, d):
self.data['access_token'] = d['access_token']
self.data['expires_at'] = time.time() + d['expires_in'] - 100
refresh_token = d.get('refresh_token')
if refresh_token is not None:
self.data['refresh_token'] = refresh_token
with open(self.token_data_path, "w") as f:
json.dump(self.data, f)
def init_tokens(self, code):
params = self.copy('user', 'client_id', 'client_secret',
'redirect_uri')
params['code'] = code
params['grant_type'] = 'authorization_code'
d = self.get_response(self.data['token_uri'], params)
self.update_config(d)
def refresh_tokens(self):
params = self.copy('client_id', 'client_secret', 'refresh_token')
params['grant_type'] = 'refresh_token'
d = self.get_response(self.data['token_uri'], params)
self.update_config(d)
def token(self):
if time.time() >= self.data.get('expires_at'):
self.refresh_tokens()
return self.data['access_token']
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--init", action="store_true", default=False,
help="initialize access and refresh tokens")
parser.add_argument('tokenfile', metavar='<token data file path>',
help="location of the token data file")
args = parser.parse_args()
auth = OAuth2(args.tokenfile)
if args.init:
print("Visit this url to obtain a verification code:")
print(" %s\n" % auth.code_url())
code = raw_input("Enter verification code: ")
response = auth.init_tokens(code)
else:
sys.stdout.write("%s" % auth.token())
sys.exit(0)
|