from __future__ import print_function
import os import gzip import codecs import argparse from typing import IO, Union
import numpy as np
import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from torchvision import datasets, transforms from torch.optim.lr_scheduler import StepLR
import shutil
class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(1, 32, 3, 1) self.conv2 = nn.Conv2d(32, 64, 3, 1) self.dropout1 = nn.Dropout(0.25) self.dropout2 = nn.Dropout(0.5) self.fc1 = nn.Linear(9216, 128) self.fc2 = nn.Linear(128, 10)
def forward(self, x): x = self.conv1(x) x = F.relu(x) x = self.conv2(x) x = F.relu(x) x = F.max_pool2d(x, 2) x = self.dropout1(x) x = torch.flatten(x, 1) x = self.fc1(x) x = F.relu(x) x = self.dropout2(x) x = self.fc2(x) output = F.log_softmax(x, dim=1) return output
def train(args, model, device, train_loader, optimizer, epoch): model.train() for batch_idx, (data, target) in enumerate(train_loader): data, target = data.to(device), target.to(device) optimizer.zero_grad() output = model(data) loss = F.nll_loss(output, target) loss.backward() optimizer.step() if batch_idx % args.log_interval == 0: print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( epoch, batch_idx * len(data), len(train_loader.dataset), 100. * batch_idx / len(train_loader), loss.item())) if args.dry_run: break
def test(model, device, test_loader): model.eval() test_loss = 0 correct = 0 with torch.no_grad(): for data, target in test_loader: data, target = data.to(device), target.to(device) output = model(data) test_loss += F.nll_loss(output, target, reduction='sum').item() pred = output.argmax(dim=1, keepdim=True) correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( test_loss, correct, len(test_loader.dataset), 100. * correct / len(test_loader.dataset)))
def get_int(b: bytes) -> int: return int(codecs.encode(b, 'hex'), 16)
def open_maybe_compressed_file(path: Union[str, IO]) -> Union[IO, gzip.GzipFile]: """Return a file object that possibly decompresses 'path' on the fly. Decompression occurs when argument `path` is a string and ends with '.gz' or '.xz'. """ if not isinstance(path, torch._six.string_classes): return path if path.endswith('.gz'): return gzip.open(path, 'rb') if path.endswith('.xz'): return lzma.open(path, 'rb') return open(path, 'rb')
SN3_PASCALVINCENT_TYPEMAP = { 8: (torch.uint8, np.uint8, np.uint8), 9: (torch.int8, np.int8, np.int8), 11: (torch.int16, np.dtype('>i2'), 'i2'), 12: (torch.int32, np.dtype('>i4'), 'i4'), 13: (torch.float32, np.dtype('>f4'), 'f4'), 14: (torch.float64, np.dtype('>f8'), 'f8') }
def read_sn3_pascalvincent_tensor(path: Union[str, IO], strict: bool = True) -> torch.Tensor: """Read a SN3 file in "Pascal Vincent" format (Lush file 'libidx/idx-io.lsh'). Argument may be a filename, compressed filename, or file object. """ with open_maybe_compressed_file(path) as f: data = f.read() magic = get_int(data[0:4]) nd = magic % 256 ty = magic // 256 assert 1 <= nd <= 3 assert 8 <= ty <= 14 m = SN3_PASCALVINCENT_TYPEMAP[ty] s = [get_int(data[4 * (i + 1): 4 * (i + 2)]) for i in range(nd)] parsed = np.frombuffer(data, dtype=m[1], offset=(4 * (nd + 1))) assert parsed.shape[0] == np.prod(s) or not strict return torch.from_numpy(parsed.astype(m[2], copy=False)).view(*s)
def read_label_file(path: str) -> torch.Tensor: with open(path, 'rb') as f: x = read_sn3_pascalvincent_tensor(f, strict=False) assert(x.dtype == torch.uint8) assert(x.ndimension() == 1) return x.long()
def read_image_file(path: str) -> torch.Tensor: with open(path, 'rb') as f: x = read_sn3_pascalvincent_tensor(f, strict=False) assert(x.dtype == torch.uint8) assert(x.ndimension() == 3) return x
def extract_archive(from_path, to_path): to_path = os.path.join(to_path, os.path.splitext(os.path.basename(from_path))[0]) with open(to_path, "wb") as out_f, gzip.GzipFile(from_path) as zip_f: out_f.write(zip_f.read())
def convert_raw_mnist_dataset_to_pytorch_mnist_dataset(data_url): """ raw
{data_url}/ train-images-idx3-ubyte.gz train-labels-idx1-ubyte.gz t10k-images-idx3-ubyte.gz t10k-labels-idx1-ubyte.gz
processed
{data_url}/ train-images-idx3-ubyte.gz train-labels-idx1-ubyte.gz t10k-images-idx3-ubyte.gz t10k-labels-idx1-ubyte.gz MNIST/raw train-images-idx3-ubyte train-labels-idx1-ubyte t10k-images-idx3-ubyte t10k-labels-idx1-ubyte MNIST/processed training.pt test.pt """ resources = [ "train-images-idx3-ubyte.gz", "train-labels-idx1-ubyte.gz", "t10k-images-idx3-ubyte.gz", "t10k-labels-idx1-ubyte.gz" ]
pytorch_mnist_dataset = os.path.join(data_url, 'MNIST')
raw_folder = os.path.join(pytorch_mnist_dataset, 'raw') processed_folder = os.path.join(pytorch_mnist_dataset, 'processed')
os.makedirs(raw_folder, exist_ok=True) os.makedirs(processed_folder, exist_ok=True)
print('Processing...')
for f in resources: extract_archive(os.path.join(data_url, f), raw_folder)
training_set = ( read_image_file(os.path.join(raw_folder, 'train-images-idx3-ubyte')), read_label_file(os.path.join(raw_folder, 'train-labels-idx1-ubyte')) ) test_set = ( read_image_file(os.path.join(raw_folder, 't10k-images-idx3-ubyte')), read_label_file(os.path.join(raw_folder, 't10k-labels-idx1-ubyte')) ) with open(os.path.join(processed_folder, 'training.pt'), 'wb') as f: torch.save(training_set, f) with open(os.path.join(processed_folder, 'test.pt'), 'wb') as f: torch.save(test_set, f)
print('Done!')
def main(): parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--data_url', type=str, default=False, help='mnist dataset path') parser.add_argument('--train_url', type=str, default=False, help='mnist model path')
parser.add_argument('--batch-size', type=int, default=64, metavar='N', help='input batch size for training (default: 64)') parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', help='input batch size for testing (default: 1000)') parser.add_argument('--epochs', type=int, default=14, metavar='N', help='number of epochs to train (default: 14)') parser.add_argument('--lr', type=float, default=1.0, metavar='LR', help='learning rate (default: 1.0)') parser.add_argument('--gamma', type=float, default=0.7, metavar='M', help='Learning rate step gamma (default: 0.7)') parser.add_argument('--no-cuda', action='store_true', default=False, help='disables CUDA training') parser.add_argument('--dry-run', action='store_true', default=False, help='quickly check a single pass') parser.add_argument('--seed', type=int, default=1, metavar='S', help='random seed (default: 1)') parser.add_argument('--log-interval', type=int, default=10, metavar='N', help='how many batches to wait before logging training status') parser.add_argument('--save-model', action='store_true', default=True, help='For Saving the current Model') args = parser.parse_args()
use_cuda = not args.no_cuda and torch.cuda.is_available()
torch.manual_seed(args.seed)
device = torch.device("cuda" if use_cuda else "cpu")
train_kwargs = {'batch_size': args.batch_size} test_kwargs = {'batch_size': args.test_batch_size} if use_cuda: cuda_kwargs = {'num_workers': 1, 'pin_memory': True, 'shuffle': True} train_kwargs.update(cuda_kwargs) test_kwargs.update(cuda_kwargs)
transform=transforms.Compose([ transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ])
convert_raw_mnist_dataset_to_pytorch_mnist_dataset(args.data_url)
dataset1 = datasets.MNIST(args.data_url, train=True, download=False, transform=transform) dataset2 = datasets.MNIST(args.data_url, train=False, download=False, transform=transform)
train_loader = torch.utils.data.DataLoader(dataset1, **train_kwargs) test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
model = Net().to(device) optimizer = optim.Adadelta(model.parameters(), lr=args.lr) scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1): train(args, model, device, train_loader, optimizer, epoch) test(model, device, test_loader) scheduler.step()
if args.save_model:
model_path = os.path.join(args.train_url, 'model') os.makedirs(model_path, exist_ok = True)
torch.save(model.state_dict(), os.path.join(model_path, 'mnist_cnn.pt'))
the_path_of_current_file = os.path.dirname(__file__) shutil.copyfile(os.path.join(the_path_of_current_file, 'infer/customize_service.py'), os.path.join(model_path, 'customize_service.py')) shutil.copyfile(os.path.join(the_path_of_current_file, 'infer/config.json'), os.path.join(model_path, 'config.json'))
if __name__ == '__main__': main()
|